Use streaming read I/O in heap amcheck
authorMelanie Plageman <melanieplageman@gmail.com>
Thu, 27 Mar 2025 18:02:40 +0000 (14:02 -0400)
committerMelanie Plageman <melanieplageman@gmail.com>
Thu, 27 Mar 2025 18:04:14 +0000 (14:04 -0400)
Instead of directly invoking ReadBuffer() for each unskippable block in
the heap relation, verify_heapam() now uses the read stream API to
acquire the next buffer to check for corruption.

Author: Matheus Alcantara <matheusssilv97@gmail.com>
Co-authored-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>
Reviewed-by: jian he <jian.universality@gmail.com>
Discussion: https://postgr.es/m/flat/CAFY6G8eLyz7%2BsccegZYFj%3D5tAUR-GZ9uEq4Ch5gvwKqUwb_hCA%40mail.gmail.com

contrib/amcheck/verify_heapam.c
src/tools/pgindent/typedefs.list

index 827312306f6043ff98716e6ee59c200d71dfae40..9e4d558436bac50d9cf78607a87a05b9c9b6976a 100644 (file)
@@ -25,6 +25,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/rel.h"
@@ -118,7 +119,10 @@ typedef struct HeapCheckContext
    Relation    valid_toast_index;
    int         num_toast_indexes;
 
-   /* Values for iterating over pages in the relation */
+   /*
+    * Values for iterating over pages in the relation. `blkno` is the most
+    * recent block in the buffer yielded by the read stream API.
+    */
    BlockNumber blkno;
    BufferAccessStrategy bstrategy;
    Buffer      buffer;
@@ -153,7 +157,32 @@ typedef struct HeapCheckContext
    Tuplestorestate *tupstore;
 } HeapCheckContext;
 
+/*
+ * The per-relation data provided to the read stream API for heap amcheck to
+ * use in its callback for the SKIP_PAGES_ALL_FROZEN and
+ * SKIP_PAGES_ALL_VISIBLE options.
+ */
+typedef struct HeapCheckReadStreamData
+{
+   /*
+    * `range` is used by all SkipPages options. SKIP_PAGES_NONE uses the
+    * default read stream callback, block_range_read_stream_cb(), which takes
+    * a BlockRangeReadStreamPrivate as its callback_private_data. `range`
+    * keeps track of the current block number across
+    * read_stream_next_buffer() invocations.
+    */
+   BlockRangeReadStreamPrivate range;
+   SkipPages   skip_option;
+   Relation    rel;
+   Buffer     *vmbuffer;
+} HeapCheckReadStreamData;
+
+
 /* Internal implementation */
+static BlockNumber heapcheck_read_stream_next_unskippable(ReadStream *stream,
+                                                         void *callback_private_data,
+                                                         void *per_buffer_data);
+
 static void check_tuple(HeapCheckContext *ctx,
                        bool *xmin_commit_status_ok,
                        XidCommitStatus *xmin_commit_status);
@@ -231,6 +260,11 @@ verify_heapam(PG_FUNCTION_ARGS)
    BlockNumber last_block;
    BlockNumber nblocks;
    const char *skip;
+   ReadStream *stream;
+   int         stream_flags;
+   ReadStreamBlockNumberCB stream_cb;
+   void       *stream_data;
+   HeapCheckReadStreamData stream_skip_data;
 
    /* Check supplied arguments */
    if (PG_ARGISNULL(0))
@@ -404,7 +438,35 @@ verify_heapam(PG_FUNCTION_ARGS)
    if (TransactionIdIsNormal(ctx.relfrozenxid))
        ctx.oldest_xid = ctx.relfrozenxid;
 
-   for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+   /* Now that `ctx` is set up, set up the read stream */
+   stream_skip_data.range.current_blocknum = first_block;
+   stream_skip_data.range.last_exclusive = last_block + 1;
+   stream_skip_data.skip_option = skip_option;
+   stream_skip_data.rel = ctx.rel;
+   stream_skip_data.vmbuffer = &vmbuffer;
+
+   if (skip_option == SKIP_PAGES_NONE)
+   {
+       stream_cb = block_range_read_stream_cb;
+       stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+       stream_data = &stream_skip_data.range;
+   }
+   else
+   {
+       stream_cb = heapcheck_read_stream_next_unskippable;
+       stream_flags = READ_STREAM_DEFAULT;
+       stream_data = &stream_skip_data;
+   }
+
+   stream = read_stream_begin_relation(stream_flags,
+                                       ctx.bstrategy,
+                                       ctx.rel,
+                                       MAIN_FORKNUM,
+                                       stream_cb,
+                                       stream_data,
+                                       0);
+
+   while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
    {
        OffsetNumber maxoff;
        OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +479,11 @@ verify_heapam(PG_FUNCTION_ARGS)
 
        memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
 
-       /* Optionally skip over all-frozen or all-visible blocks */
-       if (skip_option != SKIP_PAGES_NONE)
-       {
-           int32       mapbits;
-
-           mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
-                                                      &vmbuffer);
-           if (skip_option == SKIP_PAGES_ALL_FROZEN)
-           {
-               if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
-                   continue;
-           }
-
-           if (skip_option == SKIP_PAGES_ALL_VISIBLE)
-           {
-               if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
-                   continue;
-           }
-       }
-
-       /* Read and lock the next page. */
-       ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
-                                       RBM_NORMAL, ctx.bstrategy);
+       /* Lock the next page. */
+       Assert(BufferIsValid(ctx.buffer));
        LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+       ctx.blkno = BufferGetBlockNumber(ctx.buffer);
        ctx.page = BufferGetPage(ctx.buffer);
 
        /* Perform tuple checks */
@@ -799,6 +842,10 @@ verify_heapam(PG_FUNCTION_ARGS)
            break;
    }
 
+   /* Ensure that the stream is completely read */
+   Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+   read_stream_end(stream);
+
    if (vmbuffer != InvalidBuffer)
        ReleaseBuffer(vmbuffer);
 
@@ -815,6 +862,42 @@ verify_heapam(PG_FUNCTION_ARGS)
    PG_RETURN_NULL();
 }
 
+/*
+ * Heap amcheck's read stream callback for getting the next unskippable block.
+ * This callback is only used when 'all-visible' or 'all-frozen' is provided
+ * as the skip option to verify_heapam(). With the default 'none',
+ * block_range_read_stream_cb() is used instead.
+ */
+static BlockNumber
+heapcheck_read_stream_next_unskippable(ReadStream *stream,
+                                      void *callback_private_data,
+                                      void *per_buffer_data)
+{
+   HeapCheckReadStreamData *p = callback_private_data;
+
+   /* Loops over [current_blocknum, last_exclusive) blocks */
+   for (BlockNumber i; (i = p->range.current_blocknum++) < p->range.last_exclusive;)
+   {
+       uint8       mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
+
+       if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+       {
+           if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+               continue;
+       }
+
+       if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+       {
+           if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+               continue;
+       }
+
+       return i;
+   }
+
+   return InvalidBlockNumber;
+}
+
 /*
  * Shared internal implementation for report_corruption and
  * report_toast_corruption.
index 9442a4841aa3b13e1323576a49960ca8036513a9..1279b69422a59e402b160777928915ed5e1fb181 100644 (file)
@@ -1169,6 +1169,7 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapCheckReadStreamData
 HeapPageFreeze
 HeapScanDesc
 HeapTuple