Refactor XLogReadRecord(), adding XLogBeginRead() function.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Sun, 26 Jan 2020 09:39:00 +0000 (11:39 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Sun, 26 Jan 2020 09:39:00 +0000 (11:39 +0200)
The signature of XLogReadRecord() required the caller to pass the starting
WAL position as argument, or InvalidXLogRecPtr to continue reading at the
end of previous record. That's slightly awkward to the callers, as most
of them don't want to randomly jump around in the WAL stream, but start
reading at one position and then read everything from that point onwards.
Remove the 'RecPtr' argument and add a new function XLogBeginRead() to
specify the starting position instead. That's more convenient for the
callers. Also, xlogreader holds state that is reset when you change the
starting position, so having a separate function for doing that feels like
a more natural fit.

This changes XLogFindNextRecord() function so that it doesn't reset the
xlogreader's state to what it was before the call anymore. Instead, it
positions the xlogreader to the found record, like XLogBeginRead().

Reviewed-by: Kyotaro Horiguchi, Alvaro Herrera
Discussion: https://www.postgresql.org/message-id/5382a7a3-debe-be31-c860-cb810c08f366%40iki.fi

src/backend/access/transam/twophase.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/bin/pg_rewind/parsexlog.c
src/bin/pg_waldump/pg_waldump.c
src/include/access/xlogreader.h

index 6f7ee0c947d953d2dd49d2be535b5a0d128d8f02..5adf956f4133e98d836ac295d5e51748f644df05 100644 (file)
@@ -1338,7 +1338,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
                 errmsg("out of memory"),
                 errdetail("Failed while allocating a WAL reading processor.")));
 
-   record = XLogReadRecord(xlogreader, lsn, &errormsg);
+   XLogBeginRead(xlogreader, lsn);
+   record = XLogReadRecord(xlogreader, &errormsg);
    if (record == NULL)
        ereport(ERROR,
                (errcode_for_file_access(),
index 7f4f784c0eb64216397b4c1af8bf3129d07b141f..882d5e8a73fc8e6af7f6d5d36815df48164c5627 100644 (file)
@@ -897,7 +897,7 @@ static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
                              int emode, bool fetching_ckpt);
 static void CheckRecoveryConsistency(void);
 static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
@@ -4246,17 +4246,17 @@ CleanupBackupHistory(void)
 }
 
 /*
- * Attempt to read an XLOG record.
+ * Attempt to read the next XLOG record.
  *
- * If RecPtr is valid, try to read a record at that position.  Otherwise
- * try to read a record just after the last one previously read.
+ * Before first call, the reader needs to be positioned to the first record
+ * by calling XLogBeginRead().
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
  * (emode must be either PANIC, LOG). In standby mode, retries until a valid
  * record is available.
  */
 static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+ReadRecord(XLogReaderState *xlogreader, int emode,
           bool fetching_ckpt)
 {
    XLogRecord *record;
@@ -4265,7 +4265,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
    /* Pass through parameters to XLogPageRead */
    private->fetching_ckpt = fetching_ckpt;
    private->emode = emode;
-   private->randAccess = (RecPtr != InvalidXLogRecPtr);
+   private->randAccess = (xlogreader->ReadRecPtr != InvalidXLogRecPtr);
 
    /* This is the first attempt to read this page. */
    lastSourceFailed = false;
@@ -4274,7 +4274,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
    {
        char       *errormsg;
 
-       record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+       record = XLogReadRecord(xlogreader, &errormsg);
        ReadRecPtr = xlogreader->ReadRecPtr;
        EndRecPtr = xlogreader->EndRecPtr;
        if (record == NULL)
@@ -4292,8 +4292,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
             * shouldn't loop anymore in that case.
             */
            if (errormsg)
-               ereport(emode_for_corrupt_record(emode,
-                                                RecPtr ? RecPtr : EndRecPtr),
+               ereport(emode_for_corrupt_record(emode, EndRecPtr),
                        (errmsg_internal("%s", errormsg) /* already translated */ ));
        }
 
@@ -4311,8 +4310,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
                                       wal_segment_size);
            XLogFileName(fname, xlogreader->seg.ws_tli, segno,
                         wal_segment_size);
-           ereport(emode_for_corrupt_record(emode,
-                                            RecPtr ? RecPtr : EndRecPtr),
+           ereport(emode_for_corrupt_record(emode, EndRecPtr),
                    (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
                            xlogreader->latestPageTLI,
                            fname,
@@ -6427,7 +6425,8 @@ StartupXLOG(void)
             */
            if (checkPoint.redo < checkPointLoc)
            {
-               if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
+               XLogBeginRead(xlogreader, checkPoint.redo);
+               if (!ReadRecord(xlogreader, LOG, false))
                    ereport(FATAL,
                            (errmsg("could not find redo location referenced by checkpoint record"),
                             errhint("If you are restoring from a backup, touch \"%s/recovery.signal\" and add required recovery options.\n"
@@ -7034,12 +7033,13 @@ StartupXLOG(void)
        if (checkPoint.redo < RecPtr)
        {
            /* back up to find the record */
-           record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
+           XLogBeginRead(xlogreader, checkPoint.redo);
+           record = ReadRecord(xlogreader, PANIC, false);
        }
        else
        {
            /* just have to read next record after CheckPoint */
-           record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+           record = ReadRecord(xlogreader, LOG, false);
        }
 
        if (record != NULL)
@@ -7263,7 +7263,7 @@ StartupXLOG(void)
                }
 
                /* Else, try to fetch the next WAL record */
-               record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+               record = ReadRecord(xlogreader, LOG, false);
            } while (record != NULL);
 
            /*
@@ -7365,7 +7365,8 @@ StartupXLOG(void)
     * Re-fetch the last valid or last applied record, so we can identify the
     * exact endpoint of what we consider the valid portion of WAL.
     */
-   record = ReadRecord(xlogreader, LastRec, PANIC, false);
+   XLogBeginRead(xlogreader, LastRec);
+   record = ReadRecord(xlogreader, PANIC, false);
    EndOfLog = EndRecPtr;
 
    /*
@@ -8094,7 +8095,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
        return NULL;
    }
 
-   record = ReadRecord(xlogreader, RecPtr, LOG, true);
+   XLogBeginRead(xlogreader, RecPtr);
+   record = ReadRecord(xlogreader, LOG, true);
 
    if (record == NULL)
    {
index 3aa68127a3ec9258570a48a8bcbdf408e8c5f50c..32f02256edb24069dee9e18e66630293bd03c1cc 100644 (file)
@@ -218,11 +218,34 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
        snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
 }
 
+/*
+ * Begin reading WAL at 'RecPtr'.
+ *
+ * 'RecPtr' should point to the beginnning of a valid WAL record.  Pointing at
+ * the beginning of a page is also OK, if there is a new record right after
+ * the page header, i.e. not a continuation.
+ *
+ * This does not make any attempt to read the WAL yet, and hence cannot fail.
+ * If the starting address is not correct, the first call to XLogReadRecord()
+ * will error out.
+ */
+void
+XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
+{
+   Assert(!XLogRecPtrIsInvalid(RecPtr));
+
+   ResetDecoder(state);
+
+   /* Begin at the passed-in record pointer. */
+   state->EndRecPtr = RecPtr;
+   state->ReadRecPtr = InvalidXLogRecPtr;
+}
+
 /*
  * Attempt to read an XLOG record.
  *
- * If RecPtr is valid, try to read a record at that position.  Otherwise
- * try to read a record just after the last one previously read.
+ * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
+ * to XLogReadRecord().
  *
  * If the read_page callback fails to read the requested data, NULL is
  * returned.  The callback is expected to have reported the error; errormsg
@@ -235,8 +258,9 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
  * valid until the next call to XLogReadRecord.
  */
 XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+XLogReadRecord(XLogReaderState *state, char **errormsg)
 {
+   XLogRecPtr  RecPtr;
    XLogRecord *record;
    XLogRecPtr  targetPagePtr;
    bool        randAccess;
@@ -260,19 +284,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
    ResetDecoder(state);
 
-   if (RecPtr == InvalidXLogRecPtr)
-   {
-       /* No explicit start point; read the record after the one we just read */
-       RecPtr = state->EndRecPtr;
+   RecPtr = state->EndRecPtr;
 
-       if (state->ReadRecPtr == InvalidXLogRecPtr)
-           randAccess = true;
+   if (state->ReadRecPtr != InvalidXLogRecPtr)
+   {
+       /* read the record after the one we just read */
 
        /*
-        * RecPtr is pointing to end+1 of the previous WAL record.  If we're
-        * at a page boundary, no more records can fit on the current page. We
-        * must skip over the page header, but we can't do that until we've
-        * read in the page, since the header size is variable.
+        * EndRecPtr is pointing to end+1 of the previous WAL record.  If
+        * we're at a page boundary, no more records can fit on the current
+        * page. We must skip over the page header, but we can't do that until
+        * we've read in the page, since the header size is variable.
         */
    }
    else
@@ -280,8 +302,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
        /*
         * Caller supplied a position to start at.
         *
-        * In this case, the passed-in record pointer should already be
-        * pointing to a valid record starting position.
+        * In this case, EndRecPtr should already be pointing to a valid
+        * record starting position.
         */
        Assert(XRecOffIsValid(RecPtr));
        randAccess = true;
@@ -899,14 +921,17 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 /*
  * Find the first record with an lsn >= RecPtr.
  *
- * Useful for checking whether RecPtr is a valid xlog address for reading, and
- * to find the first valid address after some address when dumping records for
- * debugging purposes.
+ * This is different from XLogBeginRead() in that RecPtr doesn't need to point
+ * to a valid record boundary.  Useful for checking whether RecPtr is a valid
+ * xlog address for reading, and to find the first valid address after some
+ * address when dumping records for debugging purposes.
+ *
+ * This positions the reader, like XLogBeginRead(), so that the next call to
+ * XLogReadRecord() will read the next valid record.
  */
 XLogRecPtr
 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 {
-   XLogReaderState saved_state = *state;
    XLogRecPtr  tmpRecPtr;
    XLogRecPtr  found = InvalidXLogRecPtr;
    XLogPageHeader header;
@@ -991,27 +1016,23 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
     * because either we're at the first record after the beginning of a page
     * or we just jumped over the remaining data of a continuation.
     */
-   while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
+   XLogBeginRead(state, tmpRecPtr);
+   while (XLogReadRecord(state, &errormsg) != NULL)
    {
-       /* continue after the record */
-       tmpRecPtr = InvalidXLogRecPtr;
-
        /* past the record we've found, break out */
        if (RecPtr <= state->ReadRecPtr)
        {
+           /* Rewind the reader to the beginning of the last record. */
            found = state->ReadRecPtr;
-           goto out;
+           XLogBeginRead(state, found);
+           return found;
        }
    }
 
 err:
-out:
-   /* Reset state to what we had before finding the record */
-   state->ReadRecPtr = saved_state.ReadRecPtr;
-   state->EndRecPtr = saved_state.EndRecPtr;
    XLogReaderInvalReadState(state);
 
-   return found;
+   return InvalidXLogRecPtr;
 }
 
 #endif                         /* FRONTEND */
index bdf4389a57c63122431115a1ab648fde164d1659..cf9320061882da79c5e3debb2504b6faaaff0fb0 100644 (file)
@@ -461,11 +461,10 @@ DecodingContextReady(LogicalDecodingContext *ctx)
 void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
-   XLogRecPtr  startptr;
    ReplicationSlot *slot = ctx->slot;
 
    /* Initialize from where to start reading WAL. */
-   startptr = slot->data.restart_lsn;
+   XLogBeginRead(ctx->reader, slot->data.restart_lsn);
 
    elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
         (uint32) (slot->data.restart_lsn >> 32),
@@ -478,14 +477,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
        char       *err = NULL;
 
        /* the read_page callback waits for new WAL */
-       record = XLogReadRecord(ctx->reader, startptr, &err);
+       record = XLogReadRecord(ctx->reader, &err);
        if (err)
            elog(ERROR, "%s", err);
        if (!record)
            elog(ERROR, "no record found"); /* shouldn't happen */
 
-       startptr = InvalidXLogRecPtr;
-
        LogicalDecodingProcessRecord(ctx, ctx->reader);
 
        /* only continue till we found a consistent spot */
index 7693c98949d620981ef320be9d30d3ba1ef846c4..25b89e5616c1e13d456fc8d417db8b74e1a02e0d 100644 (file)
@@ -127,7 +127,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
    MemoryContext per_query_ctx;
    MemoryContext oldcontext;
    XLogRecPtr  end_of_wal;
-   XLogRecPtr  startptr;
    LogicalDecodingContext *ctx;
    ResourceOwner old_resowner = CurrentResourceOwner;
    ArrayType  *arr;
@@ -269,28 +268,21 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
         * xacts that committed after the slot's confirmed_flush can be
         * accumulated into reorder buffers.
         */
-       startptr = MyReplicationSlot->data.restart_lsn;
+       XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
 
        /* invalidate non-timetravel entries */
        InvalidateSystemCaches();
 
        /* Decode until we run out of records */
-       while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
-              (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+       while (ctx->reader->EndRecPtr < end_of_wal)
        {
            XLogRecord *record;
            char       *errm = NULL;
 
-           record = XLogReadRecord(ctx->reader, startptr, &errm);
+           record = XLogReadRecord(ctx->reader, &errm);
            if (errm)
                elog(ERROR, "%s", errm);
 
-           /*
-            * Now that we've set up the xlog reader state, subsequent calls
-            * pass InvalidXLogRecPtr to say "continue from last record"
-            */
-           startptr = InvalidXLogRecPtr;
-
            /*
             * The {begin_txn,change,commit_txn}_wrapper callbacks above will
             * store the description into our tuplestore.
index bb69683e2ac955e47c0009a7f7751bc77804f279..7c896946118c58dfd946912e634ea1e9dc806b76 100644 (file)
@@ -391,7 +391,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
    LogicalDecodingContext *ctx;
    ResourceOwner old_resowner = CurrentResourceOwner;
-   XLogRecPtr  startlsn;
    XLogRecPtr  retlsn;
 
    PG_TRY();
@@ -411,7 +410,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
         * Start reading at the slot's restart_lsn, which we know to point to
         * a valid record.
         */
-       startlsn = MyReplicationSlot->data.restart_lsn;
+       XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
 
        /* Initialize our return value in case we don't do anything */
        retlsn = MyReplicationSlot->data.confirmed_flush;
@@ -420,10 +419,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
        InvalidateSystemCaches();
 
        /* Decode at least one record, until we run out of records */
-       while ((!XLogRecPtrIsInvalid(startlsn) &&
-               startlsn < moveto) ||
-              (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
-               ctx->reader->EndRecPtr < moveto))
+       while (ctx->reader->EndRecPtr < moveto)
        {
            char       *errm = NULL;
            XLogRecord *record;
@@ -432,13 +428,10 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
             * Read records.  No changes are generated in fast_forward mode,
             * but snapbuilder/slot statuses are updated properly.
             */
-           record = XLogReadRecord(ctx->reader, startlsn, &errm);
+           record = XLogReadRecord(ctx->reader, &errm);
            if (errm)
                elog(ERROR, "%s", errm);
 
-           /* Read sequentially from now on */
-           startlsn = InvalidXLogRecPtr;
-
            /*
             * Process the record.  Storage-level changes are ignored in
             * fast_forward mode, but other modules (such as snapbuilder)
index 9c063749b62b28f42cf2d59087721e5d6579c272..0c65f1660b2e70b3652a15161ff3757c0796d172 100644 (file)
@@ -191,7 +191,6 @@ static volatile sig_atomic_t got_STOPPING = false;
 static volatile sig_atomic_t replication_active = false;
 
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
-static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
 /* A sample associating a WAL location with the time it was written. */
 typedef struct
@@ -1130,9 +1129,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
    pq_endmessage(&buf);
    pq_flush();
 
-
    /* Start reading WAL from the oldest required WAL. */
-   logical_startptr = MyReplicationSlot->data.restart_lsn;
+   XLogBeginRead(logical_decoding_ctx->reader,
+                 MyReplicationSlot->data.restart_lsn);
 
    /*
     * Report the location after which we'll send out further commits as the
@@ -2791,8 +2790,7 @@ XLogSendLogical(void)
     */
    WalSndCaughtUp = false;
 
-   record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
-   logical_startptr = InvalidXLogRecPtr;
+   record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
 
    /* xlog record was invalid */
    if (errm != NULL)
index b6429827cfb58c07e6766e0a3c33fd4580c7932d..eb61cb8803f3ccae67717a5c90476e946b83d307 100644 (file)
@@ -68,15 +68,14 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
    if (xlogreader == NULL)
        pg_fatal("out of memory");
 
+   XLogBeginRead(xlogreader, startpoint);
    do
    {
-       record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+       record = XLogReadRecord(xlogreader, &errormsg);
 
        if (record == NULL)
        {
-           XLogRecPtr  errptr;
-
-           errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
+           XLogRecPtr  errptr = xlogreader->EndRecPtr;
 
            if (errormsg)
                pg_fatal("could not read WAL record at %X/%X: %s",
@@ -89,8 +88,6 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
        extractPageInfo(xlogreader);
 
-       startpoint = InvalidXLogRecPtr; /* continue reading at next record */
-
    } while (xlogreader->ReadRecPtr != endpoint);
 
    XLogReaderFree(xlogreader);
@@ -120,7 +117,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
    if (xlogreader == NULL)
        pg_fatal("out of memory");
 
-   record = XLogReadRecord(xlogreader, ptr, &errormsg);
+   XLogBeginRead(xlogreader, ptr);
+   record = XLogReadRecord(xlogreader, &errormsg);
    if (record == NULL)
    {
        if (errormsg)
@@ -182,7 +180,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
    {
        uint8       info;
 
-       record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+       XLogBeginRead(xlogreader, searchptr);
+       record = XLogReadRecord(xlogreader, &errormsg);
 
        if (record == NULL)
        {
index 23f3518c2e3eb5a1621d11738feacc2b068b9196..83202b5b876be70cb073126ff00dfaa309508059 100644 (file)
@@ -1053,7 +1053,7 @@ main(int argc, char **argv)
    for (;;)
    {
        /* try to read the next record */
-       record = XLogReadRecord(xlogreader_state, first_record, &errormsg);
+       record = XLogReadRecord(xlogreader_state, &errormsg);
        if (!record)
        {
            if (!config.follow || private.endptr_reached)
@@ -1065,9 +1065,6 @@ main(int argc, char **argv)
            }
        }
 
-       /* after reading the first record, continue at next one */
-       first_record = InvalidXLogRecPtr;
-
        /* apply all specified filters */
        if (config.filter_by_rmgr != -1 &&
            config.filter_by_rmgr != record->xl_rmid)
index f7cc8c4e1dedf61beecc7c4a077e043b504c3221..4582196e1851ad7f5454c901c46e1c434300e277 100644 (file)
@@ -13,7 +13,9 @@
  *     how to use the XLogReader infrastructure.
  *
  *     The basic idea is to allocate an XLogReaderState via
- *     XLogReaderAllocate(), and call XLogReadRecord() until it returns NULL.
+ *     XLogReaderAllocate(), position the reader to the first record with
+ *     XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
+ *     until it returns NULL.
  *
  *     After reading a record with XLogReadRecord(), it's decomposed into
  *     the per-block and main data parts, and the parts can be accessed
@@ -126,7 +128,8 @@ struct XLogReaderState
 
    /*
     * Start and end point of last record read.  EndRecPtr is also used as the
-    * position to read next, if XLogReadRecord receives an invalid recptr.
+    * position to read next.  Calling XLogBeginRead() sets EndRecPtr to the
+    * starting position and ReadRecPtr to invalid.
     */
    XLogRecPtr  ReadRecPtr;     /* start of last record read */
    XLogRecPtr  EndRecPtr;      /* end+1 of last record read */
@@ -239,18 +242,20 @@ typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
                               int segsize, const char *waldir);
 
+/* Position the XLogReader to given record */
+extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
+#ifdef FRONTEND
+extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
+#endif                         /* FRONTEND */
+
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-                                        XLogRecPtr recptr, char **errormsg);
+                                        char **errormsg);
 
 /* Validate a page */
 extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
                                         XLogRecPtr recptr, char *phdr);
 
-#ifdef FRONTEND
-extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
-#endif                         /* FRONTEND */
-
 /*
  * Error information from WALRead that both backend and frontend caller can
  * process.  Currently only errors from pg_pread can be reported.