errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));
- record = XLogReadRecord(xlogreader, lsn, &errormsg);
+ XLogBeginRead(xlogreader, lsn);
+ record = XLogReadRecord(xlogreader, &errormsg);
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
+static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void);
static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
}
/*
- * Attempt to read an XLOG record.
+ * Attempt to read the next XLOG record.
*
- * If RecPtr is valid, try to read a record at that position. Otherwise
- * try to read a record just after the last one previously read.
+ * Before first call, the reader needs to be positioned to the first record
+ * by calling XLogBeginRead().
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
static XLogRecord *
-ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
+ReadRecord(XLogReaderState *xlogreader, int emode,
bool fetching_ckpt)
{
XLogRecord *record;
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
private->emode = emode;
- private->randAccess = (RecPtr != InvalidXLogRecPtr);
+ private->randAccess = (xlogreader->ReadRecPtr != InvalidXLogRecPtr);
/* This is the first attempt to read this page. */
lastSourceFailed = false;
{
char *errormsg;
- record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+ record = XLogReadRecord(xlogreader, &errormsg);
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
* shouldn't loop anymore in that case.
*/
if (errormsg)
- ereport(emode_for_corrupt_record(emode,
- RecPtr ? RecPtr : EndRecPtr),
+ ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg_internal("%s", errormsg) /* already translated */ ));
}
wal_segment_size);
XLogFileName(fname, xlogreader->seg.ws_tli, segno,
wal_segment_size);
- ereport(emode_for_corrupt_record(emode,
- RecPtr ? RecPtr : EndRecPtr),
+ ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg("unexpected timeline ID %u in log segment %s, offset %u",
xlogreader->latestPageTLI,
fname,
*/
if (checkPoint.redo < checkPointLoc)
{
- if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
+ XLogBeginRead(xlogreader, checkPoint.redo);
+ if (!ReadRecord(xlogreader, LOG, false))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
errhint("If you are restoring from a backup, touch \"%s/recovery.signal\" and add required recovery options.\n"
if (checkPoint.redo < RecPtr)
{
/* back up to find the record */
- record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
+ XLogBeginRead(xlogreader, checkPoint.redo);
+ record = ReadRecord(xlogreader, PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+ record = ReadRecord(xlogreader, LOG, false);
}
if (record != NULL)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
+ record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
- record = ReadRecord(xlogreader, LastRec, PANIC, false);
+ XLogBeginRead(xlogreader, LastRec);
+ record = ReadRecord(xlogreader, PANIC, false);
EndOfLog = EndRecPtr;
/*
return NULL;
}
- record = ReadRecord(xlogreader, RecPtr, LOG, true);
+ XLogBeginRead(xlogreader, RecPtr);
+ record = ReadRecord(xlogreader, LOG, true);
if (record == NULL)
{
snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
}
+/*
+ * Begin reading WAL at 'RecPtr'.
+ *
+ * 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at
+ * the beginning of a page is also OK, if there is a new record right after
+ * the page header, i.e. not a continuation.
+ *
+ * This does not make any attempt to read the WAL yet, and hence cannot fail.
+ * If the starting address is not correct, the first call to XLogReadRecord()
+ * will error out.
+ */
+void
+XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
+{
+ Assert(!XLogRecPtrIsInvalid(RecPtr));
+
+ ResetDecoder(state);
+
+ /* Begin at the passed-in record pointer. */
+ state->EndRecPtr = RecPtr;
+ state->ReadRecPtr = InvalidXLogRecPtr;
+}
+
/*
* Attempt to read an XLOG record.
*
- * If RecPtr is valid, try to read a record at that position. Otherwise
- * try to read a record just after the last one previously read.
+ * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
+ * to XLogReadRecord().
*
* If the read_page callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
* valid until the next call to XLogReadRecord.
*/
XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+XLogReadRecord(XLogReaderState *state, char **errormsg)
{
+ XLogRecPtr RecPtr;
XLogRecord *record;
XLogRecPtr targetPagePtr;
bool randAccess;
ResetDecoder(state);
- if (RecPtr == InvalidXLogRecPtr)
- {
- /* No explicit start point; read the record after the one we just read */
- RecPtr = state->EndRecPtr;
+ RecPtr = state->EndRecPtr;
- if (state->ReadRecPtr == InvalidXLogRecPtr)
- randAccess = true;
+ if (state->ReadRecPtr != InvalidXLogRecPtr)
+ {
+ /* read the record after the one we just read */
/*
- * RecPtr is pointing to end+1 of the previous WAL record. If we're
- * at a page boundary, no more records can fit on the current page. We
- * must skip over the page header, but we can't do that until we've
- * read in the page, since the header size is variable.
+ * EndRecPtr is pointing to end+1 of the previous WAL record. If
+ * we're at a page boundary, no more records can fit on the current
+ * page. We must skip over the page header, but we can't do that until
+ * we've read in the page, since the header size is variable.
*/
}
else
/*
* Caller supplied a position to start at.
*
- * In this case, the passed-in record pointer should already be
- * pointing to a valid record starting position.
+ * In this case, EndRecPtr should already be pointing to a valid
+ * record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
randAccess = true;
/*
* Find the first record with an lsn >= RecPtr.
*
- * Useful for checking whether RecPtr is a valid xlog address for reading, and
- * to find the first valid address after some address when dumping records for
- * debugging purposes.
+ * This is different from XLogBeginRead() in that RecPtr doesn't need to point
+ * to a valid record boundary. Useful for checking whether RecPtr is a valid
+ * xlog address for reading, and to find the first valid address after some
+ * address when dumping records for debugging purposes.
+ *
+ * This positions the reader, like XLogBeginRead(), so that the next call to
+ * XLogReadRecord() will read the next valid record.
*/
XLogRecPtr
XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
{
- XLogReaderState saved_state = *state;
XLogRecPtr tmpRecPtr;
XLogRecPtr found = InvalidXLogRecPtr;
XLogPageHeader header;
* because either we're at the first record after the beginning of a page
* or we just jumped over the remaining data of a continuation.
*/
- while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
+ XLogBeginRead(state, tmpRecPtr);
+ while (XLogReadRecord(state, &errormsg) != NULL)
{
- /* continue after the record */
- tmpRecPtr = InvalidXLogRecPtr;
-
/* past the record we've found, break out */
if (RecPtr <= state->ReadRecPtr)
{
+ /* Rewind the reader to the beginning of the last record. */
found = state->ReadRecPtr;
- goto out;
+ XLogBeginRead(state, found);
+ return found;
}
}
err:
-out:
- /* Reset state to what we had before finding the record */
- state->ReadRecPtr = saved_state.ReadRecPtr;
- state->EndRecPtr = saved_state.EndRecPtr;
XLogReaderInvalReadState(state);
- return found;
+ return InvalidXLogRecPtr;
}
#endif /* FRONTEND */
void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
- XLogRecPtr startptr;
ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = slot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, slot->data.restart_lsn);
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
(uint32) (slot->data.restart_lsn >> 32),
char *err = NULL;
/* the read_page callback waits for new WAL */
- record = XLogReadRecord(ctx->reader, startptr, &err);
+ record = XLogReadRecord(ctx->reader, &err);
if (err)
elog(ERROR, "%s", err);
if (!record)
elog(ERROR, "no record found"); /* shouldn't happen */
- startptr = InvalidXLogRecPtr;
-
LogicalDecodingProcessRecord(ctx, ctx->reader);
/* only continue till we found a consistent spot */
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
- XLogRecPtr startptr;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
* xacts that committed after the slot's confirmed_flush can be
* accumulated into reorder buffers.
*/
- startptr = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
/* Decode until we run out of records */
- while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
- (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+ while (ctx->reader->EndRecPtr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;
- record = XLogReadRecord(ctx->reader, startptr, &errm);
+ record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);
- /*
- * Now that we've set up the xlog reader state, subsequent calls
- * pass InvalidXLogRecPtr to say "continue from last record"
- */
- startptr = InvalidXLogRecPtr;
-
/*
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
* store the description into our tuplestore.
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr startlsn;
XLogRecPtr retlsn;
PG_TRY();
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
- startlsn = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
/* Initialize our return value in case we don't do anything */
retlsn = MyReplicationSlot->data.confirmed_flush;
InvalidateSystemCaches();
/* Decode at least one record, until we run out of records */
- while ((!XLogRecPtrIsInvalid(startlsn) &&
- startlsn < moveto) ||
- (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
- ctx->reader->EndRecPtr < moveto))
+ while (ctx->reader->EndRecPtr < moveto)
{
char *errm = NULL;
XLogRecord *record;
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
- record = XLogReadRecord(ctx->reader, startlsn, &errm);
+ record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);
- /* Read sequentially from now on */
- startlsn = InvalidXLogRecPtr;
-
/*
* Process the record. Storage-level changes are ignored in
* fast_forward mode, but other modules (such as snapbuilder)
static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext *logical_decoding_ctx = NULL;
-static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
/* A sample associating a WAL location with the time it was written. */
typedef struct
pq_endmessage(&buf);
pq_flush();
-
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(logical_decoding_ctx->reader,
+ MyReplicationSlot->data.restart_lsn);
/*
* Report the location after which we'll send out further commits as the
*/
WalSndCaughtUp = false;
- record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
- logical_startptr = InvalidXLogRecPtr;
+ record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
/* xlog record was invalid */
if (errm != NULL)
if (xlogreader == NULL)
pg_fatal("out of memory");
+ XLogBeginRead(xlogreader, startpoint);
do
{
- record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+ record = XLogReadRecord(xlogreader, &errormsg);
if (record == NULL)
{
- XLogRecPtr errptr;
-
- errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
+ XLogRecPtr errptr = xlogreader->EndRecPtr;
if (errormsg)
pg_fatal("could not read WAL record at %X/%X: %s",
extractPageInfo(xlogreader);
- startpoint = InvalidXLogRecPtr; /* continue reading at next record */
-
} while (xlogreader->ReadRecPtr != endpoint);
XLogReaderFree(xlogreader);
if (xlogreader == NULL)
pg_fatal("out of memory");
- record = XLogReadRecord(xlogreader, ptr, &errormsg);
+ XLogBeginRead(xlogreader, ptr);
+ record = XLogReadRecord(xlogreader, &errormsg);
if (record == NULL)
{
if (errormsg)
{
uint8 info;
- record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+ XLogBeginRead(xlogreader, searchptr);
+ record = XLogReadRecord(xlogreader, &errormsg);
if (record == NULL)
{
for (;;)
{
/* try to read the next record */
- record = XLogReadRecord(xlogreader_state, first_record, &errormsg);
+ record = XLogReadRecord(xlogreader_state, &errormsg);
if (!record)
{
if (!config.follow || private.endptr_reached)
}
}
- /* after reading the first record, continue at next one */
- first_record = InvalidXLogRecPtr;
-
/* apply all specified filters */
if (config.filter_by_rmgr != -1 &&
config.filter_by_rmgr != record->xl_rmid)
* how to use the XLogReader infrastructure.
*
* The basic idea is to allocate an XLogReaderState via
- * XLogReaderAllocate(), and call XLogReadRecord() until it returns NULL.
+ * XLogReaderAllocate(), position the reader to the first record with
+ * XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
+ * until it returns NULL.
*
* After reading a record with XLogReadRecord(), it's decomposed into
* the per-block and main data parts, and the parts can be accessed
/*
* Start and end point of last record read. EndRecPtr is also used as the
- * position to read next, if XLogReadRecord receives an invalid recptr.
+ * position to read next. Calling XLogBeginRead() sets EndRecPtr to the
+ * starting position and ReadRecPtr to invalid.
*/
XLogRecPtr ReadRecPtr; /* start of last record read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
+/* Position the XLogReader to given record */
+extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
+#ifdef FRONTEND
+extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
+#endif /* FRONTEND */
+
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
- XLogRecPtr recptr, char **errormsg);
+ char **errormsg);
/* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr);
-#ifdef FRONTEND
-extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
-#endif /* FRONTEND */
-
/*
* Error information from WALRead that both backend and frontend caller can
* process. Currently only errors from pg_pread can be reported.