*
* Note clearly that this function can access WAL during normal operation,
* similarly to the way WALSender or Logical Decoding would do.
- *
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogReaderState *xlogreader;
char *errormsg;
- xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
- NULL);
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ &read_local_xlog_page, NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
int source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *readTLI);
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
if (!debug_reader)
- debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+ debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
+ NULL, NULL);
if (!debug_reader)
{
XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
offset = XLogSegmentOffset(xlogreader->latestPagePtr,
wal_segment_size);
- XLogFileName(fname, xlogreader->readPageTLI, segno,
+ XLogFileName(fname, xlogreader->seg.ws_tli, segno,
wal_segment_size);
ereport(emode_for_corrupt_record(emode,
RecPtr ? RecPtr : EndRecPtr),
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
+ xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+ &XLogPageRead, &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
* and we were reading the old WAL from a segment belonging to a higher
* timeline.
*/
- EndOfLogTLI = xlogreader->readPageTLI;
+ EndOfLogTLI = xlogreader->seg.ws_tli;
/*
* Complain if we did not roll forward far enough to render the backup
*/
static int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
+ XLogRecPtr targetRecPtr, char *readBuf)
{
XLogPageReadPrivate *private =
(XLogPageReadPrivate *) xlogreader->private_data;
Assert(targetPageOff == readOff);
Assert(reqLen <= readLen);
- *readTLI = curFileTLI;
+ xlogreader->seg.ws_tli = curFileTLI;
/*
* Check the page header immediately, so that we can retry immediately if
* Returns NULL if the xlogreader couldn't be allocated.
*/
XLogReaderState *
-XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
- void *private_data)
+XLogReaderAllocate(int wal_segment_size, const char *waldir,
+ XLogPageReadCB pagereadfunc, void *private_data)
{
XLogReaderState *state;
return NULL;
}
- state->wal_segment_size = wal_segment_size;
+ /* Initialize segment info. */
+ WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
+ waldir);
+
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
return true;
}
+/*
+ * Initialize the passed segment structs.
+ */
+void
+WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+ int segsize, const char *waldir)
+{
+ seg->ws_file = -1;
+ seg->ws_segno = 0;
+ seg->ws_off = 0;
+ seg->ws_tli = 0;
+
+ segcxt->ws_segsize = segsize;
+ if (waldir)
+ snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
+}
+
/*
* Attempt to read an XLOG record.
*
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
- state->EndRecPtr += state->wal_segment_size - 1;
- state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
+ state->EndRecPtr += state->segcxt.ws_segsize - 1;
+ state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
}
if (DecodeXLogRecord(state, record, errormsg))
Assert((pageptr % XLOG_BLCKSZ) == 0);
- XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
- targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+ XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
+ targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
/* check whether we have all the requested data already */
- if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
- reqLen <= state->readLen)
+ if (targetSegNo == state->seg.ws_segno &&
+ targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
return state->readLen;
/*
* record is. This is so that we can check the additional identification
* info that is present in the first page's "long" header.
*/
- if (targetSegNo != state->readSegNo && targetPageOff != 0)
+ if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
state->currRecPtr,
- state->readBuf, &state->readPageTLI);
+ state->readBuf);
if (readLen < 0)
goto err;
*/
readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
state->currRecPtr,
- state->readBuf, &state->readPageTLI);
+ state->readBuf);
if (readLen < 0)
goto err;
{
readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
state->currRecPtr,
- state->readBuf, &state->readPageTLI);
+ state->readBuf);
if (readLen < 0)
goto err;
}
goto err;
/* update read state information */
- state->readSegNo = targetSegNo;
- state->readOff = targetPageOff;
+ state->seg.ws_segno = targetSegNo;
+ state->seg.ws_off = targetPageOff;
state->readLen = readLen;
return readLen;
static void
XLogReaderInvalReadState(XLogReaderState *state)
{
- state->readSegNo = 0;
- state->readOff = 0;
+ state->seg.ws_segno = 0;
+ state->seg.ws_off = 0;
state->readLen = 0;
}
Assert((recptr % XLOG_BLCKSZ) == 0);
- XLByteToSeg(recptr, segno, state->wal_segment_size);
- offset = XLogSegmentOffset(recptr, state->wal_segment_size);
+ XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
+ offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
- XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
+ XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"invalid magic number %04X in log segment %s, offset %u",
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"invalid info bits %04X in log segment %s, offset %u",
(unsigned long long) state->system_identifier);
return false;
}
- else if (longhdr->xlp_seg_size != state->wal_segment_size)
+ else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
{
report_invalid_record(state,
"WAL file is from different database system: incorrect segment size in page header");
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
/* hmm, first page of file doesn't have a long header? */
report_invalid_record(state,
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"unexpected pageaddr %X/%X in log segment %s, offset %u",
{
char fname[MAXFNAMELEN];
- XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+ XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
report_invalid_record(state,
"out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
#endif /* FRONTEND */
-
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
* ----------------------------------------
void
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
{
- const XLogRecPtr lastReadPage = state->readSegNo *
- state->wal_segment_size + state->readOff;
+ const XLogRecPtr lastReadPage = state->seg.ws_segno *
+ state->segcxt.ws_segsize + state->seg.ws_off;
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
state->currTLI != ThisTimeLineID &&
state->currTLI != 0 &&
- ((wantPage + wantLength) / state->wal_segment_size) <
- (state->currTLIValidUntil / state->wal_segment_size))
+ ((wantPage + wantLength) / state->segcxt.ws_segsize) <
+ (state->currTLIValidUntil / state->segcxt.ws_segsize))
return;
/*
* by a promotion or replay from a cascaded replica.
*/
List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+ XLogRecPtr endOfSegment;
- XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1)
- * state->wal_segment_size) - 1;
-
- Assert(wantPage / state->wal_segment_size ==
- endOfSegment / state->wal_segment_size);
+ endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
+ state->segcxt.ws_segsize - 1;
+ Assert(wantPage / state->segcxt.ws_segsize ==
+ endOfSegment / state->segcxt.ws_segsize);
/*
* Find the timeline of the last LSN on the segment containing
*/
int
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
- TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
{
XLogRecPtr read_upto,
loc;
read_upto = GetFlushRecPtr();
else
read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-
- *pageTLI = ThisTimeLineID;
+ state->seg.ws_tli = ThisTimeLineID;
/*
* Check which timeline to get the record from.
read_upto = state->currTLIValidUntil;
/*
- * Setting pageTLI to our wanted record's TLI is slightly wrong;
+ * Setting ws_tli to our wanted record's TLI is slightly wrong;
* the page might begin on an older timeline if it contains a
* timeline switch, since its xlog segment will have been copied
* from the prior timeline. This is pretty harmless though, as
* nothing cares so long as the timeline doesn't go backwards. We
* should read the page header instead; FIXME someday.
*/
- *pageTLI = state->currTLI;
+ state->seg.ws_tli = state->currTLI;
/* No need to wait on a historical timeline */
break;
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
+ XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
XLOG_BLCKSZ);
/* number of valid bytes in the buffer */
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
int
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
{
return read_local_xlog_page(state, targetPagePtr, reqLen,
- targetRecPtr, cur_page, pageTLI);
+ targetRecPtr, cur_page);
}
/*
*/
bool wake_wal_senders = false;
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static WALOpenSegment *sendSeg = NULL;
+static WALSegmentContext *sendCxt = NULL;
/*
* These variables keep track of the state of the timeline we're currently
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
/* Initialize walsender process before entering the main command loop */
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+ /* Make sure we can remember the current read position in XLOG. */
+ sendSeg = (WALOpenSegment *)
+ MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
+ sendCxt = (WALSegmentContext *)
+ MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
+ WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
}
/*
ConditionVariableCancelSleep();
pgstat_report_wait_end();
- if (sendFile >= 0)
+ if (sendSeg->ws_file >= 0)
{
- close(sendFile);
- sendFile = -1;
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
}
if (MyReplicationSlot != NULL)
*/
static int
logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+ XLogRecPtr targetRecPtr, char *cur_page)
{
XLogRecPtr flushptr;
int count;
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+ XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
return count;
}
* more than one.
*/
static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
int segbytes;
int readbytes;
- startoff = XLogSegmentOffset(recptr, wal_segment_size);
+ startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
+ if (sendSeg->ws_file < 0 ||
+ !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
{
char path[MAXPGPATH];
/* Switch to another logfile segment */
- if (sendFile >= 0)
- close(sendFile);
+ if (sendSeg->ws_file >= 0)
+ close(sendSeg->ws_file);
- XLByteToSeg(recptr, sendSegNo, wal_segment_size);
+ XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
/*-------
* When reading from a historic timeline, and there is a timeline
* used portion of the old segment is copied to the new file.
*-------
*/
- curFileTimeLine = sendTimeLine;
+ sendSeg->ws_tli = sendTimeLine;
if (sendTimeLineIsHistoric)
{
XLogSegNo endSegNo;
- XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
- if (sendSegNo == endSegNo)
- curFileTimeLine = sendTimeLineNextTLI;
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+ if (sendSeg->ws_segno == endSegNo)
+ sendSeg->ws_tli = sendTimeLineNextTLI;
}
- XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
+ XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
- sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (sendFile < 0)
+ sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (sendSeg->ws_file < 0)
{
/*
* If the file is not found, assume it's because the standby
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(curFileTimeLine, sendSegNo))));
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
else
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
}
- sendOff = 0;
+ sendSeg->ws_off = 0;
}
/* Need to seek in the file? */
- if (sendOff != startoff)
+ if (sendSeg->ws_off != startoff)
{
- if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+ if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
startoff)));
- sendOff = startoff;
+ sendSeg->ws_off = startoff;
}
/* How many bytes are within this segment? */
- if (nbytes > (wal_segment_size - startoff))
- segbytes = wal_segment_size - startoff;
+ if (nbytes > (segcxt->ws_segsize - startoff))
+ segbytes = segcxt->ws_segsize - startoff;
else
segbytes = nbytes;
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
- readbytes = read(sendFile, p, segbytes);
+ readbytes = read(sendSeg->ws_file, p, segbytes);
pgstat_report_wait_end();
if (readbytes < 0)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u, length %zu: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, (Size) segbytes)));
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+ sendSeg->ws_off, (Size) segbytes)));
}
else if (readbytes == 0)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, readbytes, (Size) segbytes)));
+ XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+ sendSeg->ws_off, readbytes, (Size) segbytes)));
}
/* Update state for read */
recptr += readbytes;
- sendOff += readbytes;
+ sendSeg->ws_off += readbytes;
nbytes -= readbytes;
p += readbytes;
}
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLByteToSeg(startptr, segno, wal_segment_size);
+ XLByteToSeg(startptr, segno, segcxt->ws_segsize);
CheckXLogRemoved(segno, ThisTimeLineID);
/*
walsnd->needreload = false;
SpinLockRelease(&walsnd->mutex);
- if (reload && sendFile >= 0)
+ if (reload && sendSeg->ws_file >= 0)
{
- close(sendFile);
- sendFile = -1;
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
goto retry;
}
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
/* close the current file. */
- if (sendFile >= 0)
- close(sendFile);
- sendFile = -1;
+ if (sendSeg->ws_file >= 0)
+ close(sendSeg->ws_file);
+ sendSeg->ws_file = -1;
/* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0);
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
typedef struct XLogPageReadPrivate
{
- const char *datadir;
int tliIndex;
} XLogPageReadPrivate;
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *pageTLI);
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
/*
* Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
char *errormsg;
XLogPageReadPrivate private;
- private.datadir = datadir;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
XLogPageReadPrivate private;
XLogRecPtr endptr;
- private.datadir = datadir;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
forkptr += SizeOfXLogShortPHD;
}
- private.datadir = datadir;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
/* XLogReader callback function, to read a WAL page */
static int
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *pageTLI)
+ int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
{
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
uint32 targetPageOff;
XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
xlogreadsegno, WalSegSz);
- snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+ snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s",
+ xlogreader->segcxt.ws_dir, xlogfname);
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
Assert(targetSegNo == xlogreadsegno);
- *pageTLI = targetHistory[private->tliIndex].tli;
+ xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli;
return XLOG_BLCKSZ;
}
typedef struct XLogDumpPrivate
{
TimeLineID timeline;
- char *inpath;
XLogRecPtr startptr;
XLogRecPtr endptr;
bool endptr_reached;
}
/*
- * Identify the target directory and set WalSegSz.
+ * Identify the target directory.
*
* Try to find the file in several places:
* if directory != NULL:
* XLOGDIR /
* $PGDATA / XLOGDIR /
*
- * Set the valid target directory in private->inpath.
+ * The valid target directory is returned.
*/
-static void
-identify_target_directory(XLogDumpPrivate *private, char *directory,
- char *fname)
+static char *
+identify_target_directory(char *directory, char *fname)
{
char fpath[MAXPGPATH];
if (directory != NULL)
{
if (search_directory(directory, fname))
- {
- private->inpath = pg_strdup(directory);
- return;
- }
+ return pg_strdup(directory);
/* directory / XLOGDIR */
snprintf(fpath, MAXPGPATH, "%s/%s", directory, XLOGDIR);
if (search_directory(fpath, fname))
- {
- private->inpath = pg_strdup(fpath);
- return;
- }
+ return pg_strdup(fpath);
}
else
{
/* current directory */
if (search_directory(".", fname))
- {
- private->inpath = pg_strdup(".");
- return;
- }
+ return pg_strdup(".");
/* XLOGDIR */
if (search_directory(XLOGDIR, fname))
- {
- private->inpath = pg_strdup(XLOGDIR);
- return;
- }
+ return pg_strdup(XLOGDIR);
datadir = getenv("PGDATA");
/* $PGDATA / XLOGDIR */
{
snprintf(fpath, MAXPGPATH, "%s/%s", datadir, XLOGDIR);
if (search_directory(fpath, fname))
- {
- private->inpath = pg_strdup(fpath);
- return;
- }
+ return pg_strdup(fpath);
}
}
fatal_error("could not locate WAL file \"%s\"", fname);
else
fatal_error("could not find any WAL file");
+
+ return NULL; /* not reached */
}
/*
*/
static int
XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+ XLogRecPtr targetPtr, char *readBuff)
{
XLogDumpPrivate *private = state->private_data;
int count = XLOG_BLCKSZ;
}
}
- XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
+ XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
readBuff, count);
return count;
XLogDumpStats stats;
XLogRecord *record;
XLogRecPtr first_record;
+ char *waldir = NULL;
char *errormsg;
static struct option long_options[] = {
}
break;
case 'p':
- private.inpath = pg_strdup(optarg);
+ waldir = pg_strdup(optarg);
break;
case 'r':
{
goto bad_argument;
}
- if (private.inpath != NULL)
+ if (waldir != NULL)
{
/* validate path points to directory */
- if (!verify_directory(private.inpath))
+ if (!verify_directory(waldir))
{
pg_log_error("path \"%s\" could not be opened: %s",
- private.inpath, strerror(errno));
+ waldir, strerror(errno));
goto bad_argument;
}
}
split_path(argv[optind], &directory, &fname);
- if (private.inpath == NULL && directory != NULL)
+ if (waldir == NULL && directory != NULL)
{
- private.inpath = directory;
+ waldir = directory;
- if (!verify_directory(private.inpath))
+ if (!verify_directory(waldir))
fatal_error("could not open directory \"%s\": %s",
- private.inpath, strerror(errno));
+ waldir, strerror(errno));
}
- identify_target_directory(&private, private.inpath, fname);
- fd = open_file_in_directory(private.inpath, fname);
+ waldir = identify_target_directory(waldir, fname);
+ fd = open_file_in_directory(waldir, fname);
if (fd < 0)
fatal_error("could not open file \"%s\"", fname);
close(fd);
/* ignore directory, already have that */
split_path(argv[optind + 1], &directory, &fname);
- fd = open_file_in_directory(private.inpath, fname);
+ fd = open_file_in_directory(waldir, fname);
if (fd < 0)
fatal_error("could not open file \"%s\"", fname);
close(fd);
}
}
else
- identify_target_directory(&private, private.inpath, NULL);
+ waldir = identify_target_directory(waldir, NULL);
/* we don't know what to print */
if (XLogRecPtrIsInvalid(private.startptr))
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage,
+ xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
&private);
if (!xlogreader_state)
fatal_error("out of memory");
#include "access/xlogrecord.h"
+/* WALOpenSegment represents a WAL segment being read. */
+typedef struct WALOpenSegment
+{
+ int ws_file; /* segment file descriptor */
+ XLogSegNo ws_segno; /* segment number */
+ uint32 ws_off; /* offset in the segment */
+ TimeLineID ws_tli; /* timeline ID of the currently open file */
+} WALOpenSegment;
+
+/* WALSegmentContext carries context information about WAL segments to read */
+typedef struct WALSegmentContext
+{
+ char ws_dir[MAXPGPATH];
+ int ws_segsize;
+} WALSegmentContext;
+
typedef struct XLogReaderState XLogReaderState;
/* Function type definition for the read_page callback */
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
- char *readBuf,
- TimeLineID *pageTLI);
+ char *readBuf);
typedef struct
{
* ----------------------------------------
*/
- /*
- * Segment size of the to-be-parsed data (mandatory).
- */
- int wal_segment_size;
-
/*
* Data input callback (mandatory).
*
* actual WAL record it's interested in. In that case, targetRecPtr can
* be used to determine which timeline to read the page from.
*
- * The callback shall set *pageTLI to the TLI of the file the page was
- * read from. It is currently used only for error reporting purposes, to
- * reconstruct the name of the WAL file where an error occurred.
+ * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+ * read from.
*/
XLogPageReadCB read_page;
char *readBuf;
uint32 readLen;
- /* last read segment, segment offset, TLI for data currently in readBuf */
- XLogSegNo readSegNo;
- uint32 readOff;
- TimeLineID readPageTLI;
+ /* last read XLOG position for data currently in readBuf */
+ WALSegmentContext segcxt;
+ WALOpenSegment seg;
/*
* beginning of prior page read, and its TLI. Doesn't necessarily
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
+ const char *waldir,
XLogPageReadCB pagereadfunc,
void *private_data);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/* Initialize supporting structures */
+extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+ int segsize, const char *waldir);
+
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
XLogRecPtr recptr, char **errormsg);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page,
- TimeLineID *pageTLI);
+ XLogRecPtr targetRecPtr, char *cur_page);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
extern int logical_read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr,
- char *cur_page, TimeLineID *pageTLI);
+ char *cur_page);
#endif