Rework WAL-reading supporting structs
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 24 Sep 2019 19:08:31 +0000 (16:08 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 24 Sep 2019 19:39:53 +0000 (16:39 -0300)
The state-tracking of WAL reading in various places was pretty messy,
mostly because the ancient physical-replication WAL reading code wasn't
using the XLogReader abstraction.  This led to some untidy code.  Make
it prettier by creating two additional supporting structs,
WALSegmentContext and WALOpenSegment which keep track of WAL-reading
state.  This makes code cleaner, as well as supports more future
cleanup.

Author: Antonin Houska
Reviewed-by: Álvaro Herrera and (older versions) Robert Haas
Discussion: https://postgr.es/m/14984.1554998742@spoje.net

12 files changed:
src/backend/access/transam/twophase.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c
src/backend/access/transam/xlogutils.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/walsender.c
src/bin/pg_rewind/parsexlog.c
src/bin/pg_waldump/pg_waldump.c
src/include/access/xlogreader.h
src/include/access/xlogutils.h
src/include/replication/logicalfuncs.h

index 477709bbc23c3fecbec2042513abe1f932a79d2a..546bd43ce8b483987115bd1281e5e3fc5d4ea186 100644 (file)
@@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
  *
  * Note clearly that this function can access WAL during normal operation,
  * similarly to the way WALSender or Logical Decoding would do.
- *
  */
 static void
 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
    XLogReaderState *xlogreader;
    char       *errormsg;
 
-   xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
-                                   NULL);
+   xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+                                   &read_local_xlog_page, NULL);
    if (!xlogreader)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
index 501f46fd52d557b429ff980a6cd5ca62cd1722ec..6c69eb6dd76dce2243dae82c56cb077b68091d9b 100644 (file)
@@ -885,8 +885,7 @@ static int  XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                         int source, bool notfoundOk);
 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
 static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-                        int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-                        TimeLineID *readTLI);
+                        int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                        bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata,
            appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
 
        if (!debug_reader)
-           debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+           debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
+                                             NULL, NULL);
 
        if (!debug_reader)
        {
@@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
            XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
            offset = XLogSegmentOffset(xlogreader->latestPagePtr,
                                       wal_segment_size);
-           XLogFileName(fname, xlogreader->readPageTLI, segno,
+           XLogFileName(fname, xlogreader->seg.ws_tli, segno,
                         wal_segment_size);
            ereport(emode_for_corrupt_record(emode,
                                             RecPtr ? RecPtr : EndRecPtr),
@@ -6353,7 +6353,8 @@ StartupXLOG(void)
 
    /* Set up XLOG reader facility */
    MemSet(&private, 0, sizeof(XLogPageReadPrivate));
-   xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
+   xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+                                   &XLogPageRead, &private);
    if (!xlogreader)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7355,7 +7356,7 @@ StartupXLOG(void)
     * and we were reading the old WAL from a segment belonging to a higher
     * timeline.
     */
-   EndOfLogTLI = xlogreader->readPageTLI;
+   EndOfLogTLI = xlogreader->seg.ws_tli;
 
    /*
     * Complain if we did not roll forward far enough to render the backup
@@ -11523,7 +11524,7 @@ CancelBackup(void)
  */
 static int
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
-            XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
+            XLogRecPtr targetRecPtr, char *readBuf)
 {
    XLogPageReadPrivate *private =
    (XLogPageReadPrivate *) xlogreader->private_data;
@@ -11640,7 +11641,7 @@ retry:
    Assert(targetPageOff == readOff);
    Assert(reqLen <= readLen);
 
-   *readTLI = curFileTLI;
+   xlogreader->seg.ws_tli = curFileTLI;
 
    /*
     * Check the page header immediately, so that we can retry immediately if
index a66e3324b117f75fec5d2b5a046833ef01a9869a..27c27303d6cc5df230197fc71fba366a1371d396 100644 (file)
@@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
  * Returns NULL if the xlogreader couldn't be allocated.
  */
 XLogReaderState *
-XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
-                  void *private_data)
+XLogReaderAllocate(int wal_segment_size, const char *waldir,
+                  XLogPageReadCB pagereadfunc, void *private_data)
 {
    XLogReaderState *state;
 
@@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
        return NULL;
    }
 
-   state->wal_segment_size = wal_segment_size;
+   /* Initialize segment info. */
+   WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
+                      waldir);
+
    state->read_page = pagereadfunc;
    /* system_identifier initialized to zeroes above */
    state->private_data = private_data;
@@ -198,6 +201,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
    return true;
 }
 
+/*
+ * Initialize the passed segment structs.
+ */
+void
+WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+                  int segsize, const char *waldir)
+{
+   seg->ws_file = -1;
+   seg->ws_segno = 0;
+   seg->ws_off = 0;
+   seg->ws_tli = 0;
+
+   segcxt->ws_segsize = segsize;
+   if (waldir)
+       snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
+}
+
 /*
  * Attempt to read an XLOG record.
  *
@@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
        (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
    {
        /* Pretend it extends to end of segment */
-       state->EndRecPtr += state->wal_segment_size - 1;
-       state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
+       state->EndRecPtr += state->segcxt.ws_segsize - 1;
+       state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
    }
 
    if (DecodeXLogRecord(state, record, errormsg))
@@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
    Assert((pageptr % XLOG_BLCKSZ) == 0);
 
-   XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
-   targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+   XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
+   targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
 
    /* check whether we have all the requested data already */
-   if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
-       reqLen <= state->readLen)
+   if (targetSegNo == state->seg.ws_segno &&
+       targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
        return state->readLen;
 
    /*
@@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
     * record is.  This is so that we can check the additional identification
     * info that is present in the first page's "long" header.
     */
-   if (targetSegNo != state->readSegNo && targetPageOff != 0)
+   if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
    {
        XLogRecPtr  targetSegmentPtr = pageptr - targetPageOff;
 
        readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
                                   state->currRecPtr,
-                                  state->readBuf, &state->readPageTLI);
+                                  state->readBuf);
        if (readLen < 0)
            goto err;
 
@@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
     */
    readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
                               state->currRecPtr,
-                              state->readBuf, &state->readPageTLI);
+                              state->readBuf);
    if (readLen < 0)
        goto err;
 
@@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    {
        readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
                                   state->currRecPtr,
-                                  state->readBuf, &state->readPageTLI);
+                                  state->readBuf);
        if (readLen < 0)
            goto err;
    }
@@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
        goto err;
 
    /* update read state information */
-   state->readSegNo = targetSegNo;
-   state->readOff = targetPageOff;
+   state->seg.ws_segno = targetSegNo;
+   state->seg.ws_off = targetPageOff;
    state->readLen = readLen;
 
    return readLen;
@@ -625,8 +645,8 @@ err:
 static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
-   state->readSegNo = 0;
-   state->readOff = 0;
+   state->seg.ws_segno = 0;
+   state->seg.ws_off = 0;
    state->readLen = 0;
 }
 
@@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 
    Assert((recptr % XLOG_BLCKSZ) == 0);
 
-   XLByteToSeg(recptr, segno, state->wal_segment_size);
-   offset = XLogSegmentOffset(recptr, state->wal_segment_size);
+   XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
+   offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
 
-   XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
+   XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
 
    if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
    {
        char        fname[MAXFNAMELEN];
 
-       XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+       XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
        report_invalid_record(state,
                              "invalid magic number %04X in log segment %s, offset %u",
@@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    {
        char        fname[MAXFNAMELEN];
 
-       XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+       XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
        report_invalid_record(state,
                              "invalid info bits %04X in log segment %s, offset %u",
@@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
                                  (unsigned long long) state->system_identifier);
            return false;
        }
-       else if (longhdr->xlp_seg_size != state->wal_segment_size)
+       else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
        {
            report_invalid_record(state,
                                  "WAL file is from different database system: incorrect segment size in page header");
@@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    {
        char        fname[MAXFNAMELEN];
 
-       XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+       XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
        /* hmm, first page of file doesn't have a long header? */
        report_invalid_record(state,
@@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    {
        char        fname[MAXFNAMELEN];
 
-       XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+       XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
        report_invalid_record(state,
                              "unexpected pageaddr %X/%X in log segment %s, offset %u",
@@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
        {
            char        fname[MAXFNAMELEN];
 
-           XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+           XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
            report_invalid_record(state,
                                  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
@@ -997,7 +1017,6 @@ out:
 
 #endif                         /* FRONTEND */
 
-
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
index 1fc39333f1596c9c072f67d7afa9d91b427e7cc5..5f1e5ba75d5804ec9b5fcc40cba6e9e4ea3599f4 100644 (file)
@@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-   const XLogRecPtr lastReadPage = state->readSegNo *
-   state->wal_segment_size + state->readOff;
+   const XLogRecPtr lastReadPage = state->seg.ws_segno *
+   state->segcxt.ws_segsize + state->seg.ws_off;
 
    Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
    Assert(wantLength <= XLOG_BLCKSZ);
@@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
    if (state->currTLIValidUntil != InvalidXLogRecPtr &&
        state->currTLI != ThisTimeLineID &&
        state->currTLI != 0 &&
-       ((wantPage + wantLength) / state->wal_segment_size) <
-       (state->currTLIValidUntil / state->wal_segment_size))
+       ((wantPage + wantLength) / state->segcxt.ws_segsize) <
+       (state->currTLIValidUntil / state->segcxt.ws_segsize))
        return;
 
    /*
@@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
         * by a promotion or replay from a cascaded replica.
         */
        List       *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+       XLogRecPtr  endOfSegment;
 
-       XLogRecPtr  endOfSegment = (((wantPage / state->wal_segment_size) + 1)
-                                   * state->wal_segment_size) - 1;
-
-       Assert(wantPage / state->wal_segment_size ==
-              endOfSegment / state->wal_segment_size);
+       endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
+           state->segcxt.ws_segsize - 1;
+       Assert(wantPage / state->segcxt.ws_segsize ==
+              endOfSegment / state->segcxt.ws_segsize);
 
        /*
         * Find the timeline of the last LSN on the segment containing
@@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
  */
 int
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-                    int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
-                    TimeLineID *pageTLI)
+                    int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
    XLogRecPtr  read_upto,
                loc;
@@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
            read_upto = GetFlushRecPtr();
        else
            read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-
-       *pageTLI = ThisTimeLineID;
+       state->seg.ws_tli = ThisTimeLineID;
 
        /*
         * Check which timeline to get the record from.
@@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
            read_upto = state->currTLIValidUntil;
 
            /*
-            * Setting pageTLI to our wanted record's TLI is slightly wrong;
+            * Setting ws_tli to our wanted record's TLI is slightly wrong;
             * the page might begin on an older timeline if it contains a
             * timeline switch, since its xlog segment will have been copied
             * from the prior timeline. This is pretty harmless though, as
             * nothing cares so long as the timeline doesn't go backwards.  We
             * should read the page header instead; FIXME someday.
             */
-           *pageTLI = state->currTLI;
+           state->seg.ws_tli = state->currTLI;
 
            /* No need to wait on a historical timeline */
            break;
@@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
     * as 'count', read the whole page anyway. It's guaranteed to be
     * zero-padded up to the page boundary if it's incomplete.
     */
-   XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
+   XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
             XLOG_BLCKSZ);
 
    /* number of valid bytes in the buffer */
index f8b9020081e558f492254ad2262fb27e5e146581..da265f52940d3ddcbe25d31b424ec2b12577aca0 100644 (file)
@@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options,
 
    ctx->slot = slot;
 
-   ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+   ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
    if (!ctx->reader)
        ereport(ERROR,
                (errcode(ERRCODE_OUT_OF_MEMORY),
index d974400d6efa89906b6681ec345f574be35f978a..d1cf80d44177668bb26691dc56cd4423368756c2 100644 (file)
@@ -116,10 +116,10 @@ check_permissions(void)
 
 int
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-                            int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+                            int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
    return read_local_xlog_page(state, targetPagePtr, reqLen,
-                               targetRecPtr, cur_page, pageTLI);
+                               targetRecPtr, cur_page);
 }
 
 /*
index 23870a25a563df71f422b778554cb7df96cfce36..eb4a98cc912bae6a68137a33247adb607e88ea28 100644 (file)
@@ -128,16 +128,8 @@ bool       log_replication_commands = false;
  */
 bool       wake_wal_senders = false;
 
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static WALOpenSegment *sendSeg = NULL;
+static WALSegmentContext *sendCxt = NULL;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -285,6 +277,13 @@ InitWalSender(void)
 
    /* Initialize empty timestamp buffer for lag tracking. */
    lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+   /* Make sure we can remember the current read position in XLOG. */
+   sendSeg = (WALOpenSegment *)
+       MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
+   sendCxt = (WALSegmentContext *)
+       MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
+   WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
 }
 
 /*
@@ -301,10 +300,10 @@ WalSndErrorCleanup(void)
    ConditionVariableCancelSleep();
    pgstat_report_wait_end();
 
-   if (sendFile >= 0)
+   if (sendSeg->ws_file >= 0)
    {
-       close(sendFile);
-       sendFile = -1;
+       close(sendSeg->ws_file);
+       sendSeg->ws_file = -1;
    }
 
    if (MyReplicationSlot != NULL)
@@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd)
  */
 static int
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-                      XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+                      XLogRecPtr targetRecPtr, char *cur_page)
 {
    XLogRecPtr  flushptr;
    int         count;
@@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
        count = flushptr - targetPagePtr;   /* part of the page available */
 
    /* now actually read the data, we know it's there */
-   XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+   XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
 
    return count;
 }
@@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
 {
    char       *p;
    XLogRecPtr  recptr;
@@ -2382,17 +2381,18 @@ retry:
        int         segbytes;
        int         readbytes;
 
-       startoff = XLogSegmentOffset(recptr, wal_segment_size);
+       startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
 
-       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
+       if (sendSeg->ws_file < 0 ||
+           !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
        {
            char        path[MAXPGPATH];
 
            /* Switch to another logfile segment */
-           if (sendFile >= 0)
-               close(sendFile);
+           if (sendSeg->ws_file >= 0)
+               close(sendSeg->ws_file);
 
-           XLByteToSeg(recptr, sendSegNo, wal_segment_size);
+           XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
 
            /*-------
             * When reading from a historic timeline, and there is a timeline
@@ -2420,20 +2420,20 @@ retry:
             * used portion of the old segment is copied to the new file.
             *-------
             */
-           curFileTimeLine = sendTimeLine;
+           sendSeg->ws_tli = sendTimeLine;
            if (sendTimeLineIsHistoric)
            {
                XLogSegNo   endSegNo;
 
-               XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
-               if (sendSegNo == endSegNo)
-                   curFileTimeLine = sendTimeLineNextTLI;
+               XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+               if (sendSeg->ws_segno == endSegNo)
+                   sendSeg->ws_tli = sendTimeLineNextTLI;
            }
 
-           XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
+           XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
 
-           sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-           if (sendFile < 0)
+           sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+           if (sendSeg->ws_file < 0)
            {
                /*
                 * If the file is not found, assume it's because the standby
@@ -2444,58 +2444,58 @@ retry:
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("requested WAL segment %s has already been removed",
-                                   XLogFileNameP(curFileTimeLine, sendSegNo))));
+                                   XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
                else
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("could not open file \"%s\": %m",
                                    path)));
            }
-           sendOff = 0;
+           sendSeg->ws_off = 0;
        }
 
        /* Need to seek in the file? */
-       if (sendOff != startoff)
+       if (sendSeg->ws_off != startoff)
        {
-           if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+           if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not seek in log segment %s to offset %u: %m",
-                               XLogFileNameP(curFileTimeLine, sendSegNo),
+                               XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
                                startoff)));
-           sendOff = startoff;
+           sendSeg->ws_off = startoff;
        }
 
        /* How many bytes are within this segment? */
-       if (nbytes > (wal_segment_size - startoff))
-           segbytes = wal_segment_size - startoff;
+       if (nbytes > (segcxt->ws_segsize - startoff))
+           segbytes = segcxt->ws_segsize - startoff;
        else
            segbytes = nbytes;
 
        pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-       readbytes = read(sendFile, p, segbytes);
+       readbytes = read(sendSeg->ws_file, p, segbytes);
        pgstat_report_wait_end();
        if (readbytes < 0)
        {
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-                           XLogFileNameP(curFileTimeLine, sendSegNo),
-                           sendOff, (Size) segbytes)));
+                           XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+                           sendSeg->ws_off, (Size) segbytes)));
        }
        else if (readbytes == 0)
        {
            ereport(ERROR,
                    (errcode(ERRCODE_DATA_CORRUPTED),
                     errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-                           XLogFileNameP(curFileTimeLine, sendSegNo),
-                           sendOff, readbytes, (Size) segbytes)));
+                           XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+                           sendSeg->ws_off, readbytes, (Size) segbytes)));
        }
 
        /* Update state for read */
        recptr += readbytes;
 
-       sendOff += readbytes;
+       sendSeg->ws_off += readbytes;
        nbytes -= readbytes;
        p += readbytes;
    }
@@ -2507,7 +2507,7 @@ retry:
     * read() succeeds in that case, but the data we tried to read might
     * already have been overwritten with new WAL records.
     */
-   XLByteToSeg(startptr, segno, wal_segment_size);
+   XLByteToSeg(startptr, segno, segcxt->ws_segsize);
    CheckXLogRemoved(segno, ThisTimeLineID);
 
    /*
@@ -2526,10 +2526,10 @@ retry:
        walsnd->needreload = false;
        SpinLockRelease(&walsnd->mutex);
 
-       if (reload && sendFile >= 0)
+       if (reload && sendSeg->ws_file >= 0)
        {
-           close(sendFile);
-           sendFile = -1;
+           close(sendSeg->ws_file);
+           sendSeg->ws_file = -1;
 
            goto retry;
        }
@@ -2695,9 +2695,9 @@ XLogSendPhysical(void)
    if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
    {
        /* close the current file. */
-       if (sendFile >= 0)
-           close(sendFile);
-       sendFile = -1;
+       if (sendSeg->ws_file >= 0)
+           close(sendSeg->ws_file);
+       sendSeg->ws_file = -1;
 
        /* Send CopyDone */
        pq_putmessage_noblock('c', NULL, 0);
@@ -2768,7 +2768,7 @@ XLogSendPhysical(void)
     * calls.
     */
    enlargeStringInfo(&output_message, nbytes);
-   XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+   XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
    output_message.len += nbytes;
    output_message.data[output_message.len] = '\0';
 
index 63c3879ead81fb47489419331ee2b26053972c82..264a8f4db5fbee0a2284cb49e5530383d962ef1d 100644 (file)
@@ -43,14 +43,12 @@ static char xlogfpath[MAXPGPATH];
 
 typedef struct XLogPageReadPrivate
 {
-   const char *datadir;
    int         tliIndex;
 } XLogPageReadPrivate;
 
 static int SimpleXLogPageRead(XLogReaderState *xlogreader,
                               XLogRecPtr targetPagePtr,
-                              int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-                              TimeLineID *pageTLI);
+                              int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 
 /*
  * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -66,9 +64,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
    char       *errormsg;
    XLogPageReadPrivate private;
 
-   private.datadir = datadir;
    private.tliIndex = tliIndex;
-   xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+   xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
                                    &private);
    if (xlogreader == NULL)
        pg_fatal("out of memory");
@@ -119,9 +116,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
    XLogPageReadPrivate private;
    XLogRecPtr  endptr;
 
-   private.datadir = datadir;
    private.tliIndex = tliIndex;
-   xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+   xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
                                    &private);
    if (xlogreader == NULL)
        pg_fatal("out of memory");
@@ -177,9 +173,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
            forkptr += SizeOfXLogShortPHD;
    }
 
-   private.datadir = datadir;
    private.tliIndex = tliIndex;
-   xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+   xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
                                    &private);
    if (xlogreader == NULL)
        pg_fatal("out of memory");
@@ -237,8 +232,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 /* XLogReader callback function, to read a WAL page */
 static int
 SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-                  int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-                  TimeLineID *pageTLI)
+                  int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
 {
    XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
    uint32      targetPageOff;
@@ -283,7 +277,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
        XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
                     xlogreadsegno, WalSegSz);
 
-       snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+       snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s",
+                xlogreader->segcxt.ws_dir, xlogfname);
 
        xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
 
@@ -321,7 +316,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 
    Assert(targetSegNo == xlogreadsegno);
 
-   *pageTLI = targetHistory[private->tliIndex].tli;
+   xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli;
    return XLOG_BLCKSZ;
 }
 
index b95d467805a99c8d5e28fcdd58a8b81b8842f670..b79208cd73672be9362d59bba09b0e6a41384670 100644 (file)
@@ -33,7 +33,6 @@ static int    WalSegSz;
 typedef struct XLogDumpPrivate
 {
    TimeLineID  timeline;
-   char       *inpath;
    XLogRecPtr  startptr;
    XLogRecPtr  endptr;
    bool        endptr_reached;
@@ -224,7 +223,7 @@ search_directory(const char *directory, const char *fname)
 }
 
 /*
- * Identify the target directory and set WalSegSz.
+ * Identify the target directory.
  *
  * Try to find the file in several places:
  * if directory != NULL:
@@ -235,29 +234,22 @@ search_directory(const char *directory, const char *fname)
  *  XLOGDIR /
  *  $PGDATA / XLOGDIR /
  *
- * Set the valid target directory in private->inpath.
+ * The valid target directory is returned.
  */
-static void
-identify_target_directory(XLogDumpPrivate *private, char *directory,
-                         char *fname)
+static char *
+identify_target_directory(char *directory, char *fname)
 {
    char        fpath[MAXPGPATH];
 
    if (directory != NULL)
    {
        if (search_directory(directory, fname))
-       {
-           private->inpath = pg_strdup(directory);
-           return;
-       }
+           return pg_strdup(directory);
 
        /* directory / XLOGDIR */
        snprintf(fpath, MAXPGPATH, "%s/%s", directory, XLOGDIR);
        if (search_directory(fpath, fname))
-       {
-           private->inpath = pg_strdup(fpath);
-           return;
-       }
+           return pg_strdup(fpath);
    }
    else
    {
@@ -265,16 +257,10 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
 
        /* current directory */
        if (search_directory(".", fname))
-       {
-           private->inpath = pg_strdup(".");
-           return;
-       }
+           return pg_strdup(".");
        /* XLOGDIR */
        if (search_directory(XLOGDIR, fname))
-       {
-           private->inpath = pg_strdup(XLOGDIR);
-           return;
-       }
+           return pg_strdup(XLOGDIR);
 
        datadir = getenv("PGDATA");
        /* $PGDATA / XLOGDIR */
@@ -282,10 +268,7 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
        {
            snprintf(fpath, MAXPGPATH, "%s/%s", datadir, XLOGDIR);
            if (search_directory(fpath, fname))
-           {
-               private->inpath = pg_strdup(fpath);
-               return;
-           }
+               return pg_strdup(fpath);
        }
    }
 
@@ -294,6 +277,8 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
        fatal_error("could not locate WAL file \"%s\"", fname);
    else
        fatal_error("could not find any WAL file");
+
+   return NULL;                /* not reached */
 }
 
 /*
@@ -423,7 +408,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
  */
 static int
 XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-                XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+                XLogRecPtr targetPtr, char *readBuff)
 {
    XLogDumpPrivate *private = state->private_data;
    int         count = XLOG_BLCKSZ;
@@ -441,7 +426,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
        }
    }
 
-   XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
+   XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
                     readBuff, count);
 
    return count;
@@ -820,6 +805,7 @@ main(int argc, char **argv)
    XLogDumpStats stats;
    XLogRecord *record;
    XLogRecPtr  first_record;
+   char       *waldir = NULL;
    char       *errormsg;
 
    static struct option long_options[] = {
@@ -912,7 +898,7 @@ main(int argc, char **argv)
                }
                break;
            case 'p':
-               private.inpath = pg_strdup(optarg);
+               waldir = pg_strdup(optarg);
                break;
            case 'r':
                {
@@ -994,13 +980,13 @@ main(int argc, char **argv)
        goto bad_argument;
    }
 
-   if (private.inpath != NULL)
+   if (waldir != NULL)
    {
        /* validate path points to directory */
-       if (!verify_directory(private.inpath))
+       if (!verify_directory(waldir))
        {
            pg_log_error("path \"%s\" could not be opened: %s",
-                        private.inpath, strerror(errno));
+                        waldir, strerror(errno));
            goto bad_argument;
        }
    }
@@ -1015,17 +1001,17 @@ main(int argc, char **argv)
 
        split_path(argv[optind], &directory, &fname);
 
-       if (private.inpath == NULL && directory != NULL)
+       if (waldir == NULL && directory != NULL)
        {
-           private.inpath = directory;
+           waldir = directory;
 
-           if (!verify_directory(private.inpath))
+           if (!verify_directory(waldir))
                fatal_error("could not open directory \"%s\": %s",
-                           private.inpath, strerror(errno));
+                           waldir, strerror(errno));
        }
 
-       identify_target_directory(&private, private.inpath, fname);
-       fd = open_file_in_directory(private.inpath, fname);
+       waldir = identify_target_directory(waldir, fname);
+       fd = open_file_in_directory(waldir, fname);
        if (fd < 0)
            fatal_error("could not open file \"%s\"", fname);
        close(fd);
@@ -1056,7 +1042,7 @@ main(int argc, char **argv)
            /* ignore directory, already have that */
            split_path(argv[optind + 1], &directory, &fname);
 
-           fd = open_file_in_directory(private.inpath, fname);
+           fd = open_file_in_directory(waldir, fname);
            if (fd < 0)
                fatal_error("could not open file \"%s\"", fname);
            close(fd);
@@ -1088,7 +1074,7 @@ main(int argc, char **argv)
        }
    }
    else
-       identify_target_directory(&private, private.inpath, NULL);
+       waldir = identify_target_directory(waldir, NULL);
 
    /* we don't know what to print */
    if (XLogRecPtrIsInvalid(private.startptr))
@@ -1100,7 +1086,7 @@ main(int argc, char **argv)
    /* done with argument parsing, do the actual work */
 
    /* we have everything we need, start reading */
-   xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage,
+   xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
                                          &private);
    if (!xlogreader_state)
        fatal_error("out of memory");
index 735b1bd2fd6418d570b52f4e03eda5cabfc8959e..1bbee386e8da95ee05c3684ab064fa022b7c4631 100644 (file)
 
 #include "access/xlogrecord.h"
 
+/* WALOpenSegment represents a WAL segment being read. */
+typedef struct WALOpenSegment
+{
+   int         ws_file;        /* segment file descriptor */
+   XLogSegNo   ws_segno;       /* segment number */
+   uint32      ws_off;         /* offset in the segment */
+   TimeLineID  ws_tli;         /* timeline ID of the currently open file */
+} WALOpenSegment;
+
+/* WALSegmentContext carries context information about WAL segments to read */
+typedef struct WALSegmentContext
+{
+   char        ws_dir[MAXPGPATH];
+   int         ws_segsize;
+} WALSegmentContext;
+
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
@@ -38,8 +54,7 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
                               XLogRecPtr targetPagePtr,
                               int reqLen,
                               XLogRecPtr targetRecPtr,
-                              char *readBuf,
-                              TimeLineID *pageTLI);
+                              char *readBuf);
 
 typedef struct
 {
@@ -77,11 +92,6 @@ struct XLogReaderState
     * ----------------------------------------
     */
 
-   /*
-    * Segment size of the to-be-parsed data (mandatory).
-    */
-   int         wal_segment_size;
-
    /*
     * Data input callback (mandatory).
     *
@@ -99,9 +109,8 @@ struct XLogReaderState
     * actual WAL record it's interested in.  In that case, targetRecPtr can
     * be used to determine which timeline to read the page from.
     *
-    * The callback shall set *pageTLI to the TLI of the file the page was
-    * read from.  It is currently used only for error reporting purposes, to
-    * reconstruct the name of the WAL file where an error occurred.
+    * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+    * read from.
     */
    XLogPageReadCB read_page;
 
@@ -156,10 +165,9 @@ struct XLogReaderState
    char       *readBuf;
    uint32      readLen;
 
-   /* last read segment, segment offset, TLI for data currently in readBuf */
-   XLogSegNo   readSegNo;
-   uint32      readOff;
-   TimeLineID  readPageTLI;
+   /* last read XLOG position for data currently in readBuf */
+   WALSegmentContext segcxt;
+   WALOpenSegment seg;
 
    /*
     * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -202,12 +210,17 @@ struct XLogReaderState
 
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
+                                          const char *waldir,
                                           XLogPageReadCB pagereadfunc,
                                           void *private_data);
 
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
+/* Initialize supporting structures */
+extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+                              int segsize, const char *waldir);
+
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
                                         XLogRecPtr recptr, char **errormsg);
index 4105b59904b457f09ddc0fa92e8bd9ca9d5b07f6..2df98e45b204a59c593f4638d0ca262bcaab40e0 100644 (file)
@@ -49,8 +49,7 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 
 extern int read_local_xlog_page(XLogReaderState *state,
                                 XLogRecPtr targetPagePtr, int reqLen,
-                                XLogRecPtr targetRecPtr, char *cur_page,
-                                TimeLineID *pageTLI);
+                                XLogRecPtr targetRecPtr, char *cur_page);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
                                      XLogRecPtr wantPage, uint32 wantLength);
index a9c178a9e687b715c7ebe4fe479ef277760a3e49..012096f183d2f37e232da5f52758015d71e08e21 100644 (file)
@@ -14,6 +14,6 @@
 extern int logical_read_local_xlog_page(XLogReaderState *state,
                                         XLogRecPtr targetPagePtr,
                                         int reqLen, XLogRecPtr targetRecPtr,
-                                        char *cur_page, TimeLineID *pageTLI);
+                                        char *cur_page);
 
 #endif