*/
#include "postgres.h"
+#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
#include "utils/timestamp.h"
+/*
+ * Parse the WAL format of a xact commit and abort records into a easier to
+ * understand format.
+ *
+ * This routines are in xactdesc.c because they're accessed in backend (when
+ * replaying WAL) and frontend (pg_xlogdump) code. This file is the only xact
+ * specific one shared between both. They're complicated enough that
+ * duplication would be bothersome.
+ */
+
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactCommit;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ parsed->xact_time = xlrec->xact_time;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
+
+ parsed->xinfo = xl_xinfo->xinfo;
+
+ data += sizeof(xl_xact_xinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ xl_xact_invals *xl_invals = (xl_xact_invals *) data;
+
+ parsed->nmsgs = xl_invals->nmsgs;
+ parsed->msgs = xl_invals->msgs;
+
+ data += MinSizeOfXactInvals;
+ data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
+ xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+
+ parsed->twophase_xid = xl_twophase->xid;
+
+ data += sizeof(xl_xact_twophase);
+ }
+}
+
+void
+ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
+{
+ char *data = ((char *) xlrec) + MinSizeOfXactAbort;
+
+ memset(parsed, 0, sizeof(*parsed));
+
+ parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+ parsed->xact_time = xlrec->xact_time;
+
+ if (info & XLOG_XACT_HAS_INFO)
+ {
+ xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) data;
+
+ parsed->xinfo = xl_xinfo->xinfo;
+
+ data += sizeof(xl_xact_xinfo);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
+
+ parsed->nsubxacts = xl_subxacts->nsubxacts;
+ parsed->subxacts = xl_subxacts->subxacts;
+
+ data += MinSizeOfXactSubxacts;
+ data += parsed->nsubxacts * sizeof(TransactionId);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+ parsed->nrels = xl_relfilenodes->nrels;
+ parsed->xnodes = xl_relfilenodes->xnodes;
+
+ data += MinSizeOfXactRelfilenodes;
+ data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
+ xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
+
+ parsed->twophase_xid = xl_twophase->xid;
+
+ data += sizeof(xl_xact_twophase);
+ }
+}
static void
-xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
{
+ xl_xact_parsed_commit parsed;
int i;
- TransactionId *subxacts;
- subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ ParseCommitRecord(info, xlrec, &parsed);
+
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
+ if (parsed.nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", subxacts[i]);
+ for (i = 0; i < parsed.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
- if (xlrec->nmsgs > 0)
+ if (parsed.nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
-
- if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
+ if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
- xlrec->dbId, xlrec->tsId);
+ parsed.dbId, parsed.tsId);
appendStringInfoString(buf, "; inval msgs:");
- for (i = 0; i < xlrec->nmsgs; i++)
+ for (i = 0; i < parsed.nmsgs; i++)
{
- SharedInvalidationMessage *msg = &msgs[i];
+ SharedInvalidationMessage *msg = &parsed.msgs[i];
if (msg->id >= 0)
appendStringInfo(buf, " catcache %d", msg->id);
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
+
+ if (XactCompletionForceSyncCommit(parsed.xinfo))
+ appendStringInfo(buf, "; sync");
}
static void
-xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
+ xl_xact_parsed_abort parsed;
int i;
- appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+ ParseAbortRecord(info, xlrec, &parsed);
- if (xlrec->nsubxacts > 0)
- {
- appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xlrec->subxacts[i]);
- }
-}
-
-static void
-xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
-{
- int i;
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (xlrec->nrels > 0)
+ if (parsed.nrels > 0)
{
appendStringInfoString(buf, "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed.nrels; i++)
{
- char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (xlrec->nsubxacts > 0)
- {
- TransactionId *xacts = (TransactionId *)
- &xlrec->xnodes[xlrec->nrels];
+ if (parsed.nsubxacts > 0)
+ {
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- appendStringInfo(buf, " %u", xacts[i]);
+ for (i = 0; i < parsed.nsubxacts; i++)
+ appendStringInfo(buf, " %u", parsed.subxacts[i]);
}
}
xact_desc(StringInfo buf, XLogReaderState *record)
{
char *rec = XLogRecGetData(record);
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
- xact_desc_commit_compact(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- xact_desc_commit(buf, xlrec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
- xact_desc_abort(buf, xlrec);
- }
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
-
- appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_commit(buf, &xlrec->crec);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
-
- appendStringInfo(buf, "%u: ", xlrec->xid);
- xact_desc_abort(buf, &xlrec->arec);
+ xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
else if (info == XLOG_XACT_ASSIGNMENT)
{
{
const char *id = NULL;
- switch (info & ~XLR_INFO_MASK)
+ switch (info & XLOG_XACT_OPMASK)
{
case XLOG_XACT_COMMIT:
id = "COMMIT";
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
- case XLOG_XACT_COMMIT_COMPACT:
- id = "COMMIT_COMPACT";
- break;
}
return id;
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
/* Emit the XLOG commit record */
- xlrec.xid = xid;
-
- xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
- xlrec.crec.dbId = MyDatabaseId;
- xlrec.crec.tsId = MyDatabaseTableSpace;
-
- xlrec.crec.xact_time = GetCurrentTimestamp();
- xlrec.crec.nrels = nrels;
- xlrec.crec.nsubxacts = nchildren;
- xlrec.crec.nmsgs = ninvalmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- /* dump cache invalidation messages */
- if (ninvalmsgs > 0)
- XLogRegisterData((char *) invalmsgs,
- ninvalmsgs * sizeof(SharedInvalidationMessage));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+ recptr = XactLogCommitRecord(GetCurrentTimestamp(),
+ nchildren, children, nrels, rels,
+ ninvalmsgs, invalmsgs,
+ initfileinval, false,
+ xid);
/*
* We don't currently try to sleep before flush here ... nor is there any
int nrels,
RelFileNode *rels)
{
- xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
/*
START_CRIT_SECTION();
/* Emit the XLOG abort record */
- xlrec.xid = xid;
- xlrec.arec.xact_time = GetCurrentTimestamp();
- xlrec.arec.nrels = nrels;
- xlrec.arec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+ recptr = XactLogAbortRecord(GetCurrentTimestamp(),
+ nchildren, children,
+ nrels, rels,
+ xid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
SetCurrentTransactionStopTimestamp();
- /*
- * Do we need the long commit record? If not, use the compact format.
- *
- * For now always use the non-compact version if wal_level=logical, so
- * we can hide commits from other databases. TODO: In the future we
- * should merge compact and non-compact commits and use a flags
- * variable to determine if it contains subxacts, relations or
- * invalidation messages, that's more extensible and degrades more
- * gracefully. Till then, it's just 20 bytes of overhead.
- */
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
- XLogLogicalInfoActive())
- {
- xl_xact_commit xlrec;
-
- /*
- * Set flags required for recovery processing of commits.
- */
- xlrec.xinfo = 0;
- if (RelcacheInitFileInval)
- xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
- if (forceSyncCommit)
- xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
- xlrec.dbId = MyDatabaseId;
- xlrec.tsId = MyDatabaseTableSpace;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- xlrec.nmsgs = nmsgs;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels,
- nrels * sizeof(RelFileNode));
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
- /* dump shared cache invalidation messages */
- if (nmsgs > 0)
- XLogRegisterData((char *) invalMessages,
- nmsgs * sizeof(SharedInvalidationMessage));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
- }
- else
- {
- xl_xact_commit_compact xlrec;
-
- xlrec.xact_time = xactStopTimestamp;
- xlrec.nsubxacts = nchildren;
-
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
- }
+ XactLogCommitRecord(xactStopTimestamp,
+ nchildren, children, nrels, rels,
+ nmsgs, invalMessages,
+ RelcacheInitFileInval, forceSyncCommit,
+ InvalidTransactionId /* plain commit */);
}
/*
RelFileNode *rels;
int nchildren;
TransactionId *children;
- xl_xact_abort xlrec;
+ TimestampTz xact_time;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
/* Write the ABORT record */
if (isSubXact)
- xlrec.xact_time = GetCurrentTimestamp();
+ xact_time = GetCurrentTimestamp();
else
{
SetCurrentTransactionStopTimestamp();
- xlrec.xact_time = xactStopTimestamp;
+ xact_time = xactStopTimestamp;
}
- xlrec.nrels = nrels;
- xlrec.nsubxacts = nchildren;
- XLogBeginInsert();
- XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
-
- /* dump rels to delete */
- if (nrels > 0)
- XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
- /* dump committed child Xids */
- if (nchildren > 0)
- XLogRegisterData((char *) children,
- nchildren * sizeof(TransactionId));
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+ XactLogAbortRecord(xact_time,
+ nchildren, children,
+ nrels, rels,
+ InvalidTransactionId);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
* XLOG support routines
*/
+
+/*
+ * Log the commit record for a plain or twophase transaction commit.
+ *
+ * A 2pc commit will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync,
+ TransactionId twophase_xid)
+{
+ xl_xact_commit xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_invals xl_invals;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc commit */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_COMMIT;
+ else
+ info = XLOG_XACT_COMMIT_PREPARED;
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = commit_time;
+
+ if (relcacheInval)
+ xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+ if (forceSyncCommit)
+ xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+ /*
+ * Relcache invalidations requires information about the current database
+ * and so does logical decoding.
+ */
+ if (nmsgs > 0 || XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (nmsgs > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
+ xl_invals.nmsgs = nmsgs;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the commit record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
+ {
+ XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
+/*
+ * Log the commit record for a plain or twophase transaction abort.
+ *
+ * A 2pc abort will be emitted when twophase_xid is valid, a plain one
+ * otherwise.
+ */
+XLogRecPtr
+XactLogAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ TransactionId twophase_xid)
+{
+ xl_xact_abort xlrec;
+ xl_xact_xinfo xl_xinfo;
+ xl_xact_subxacts xl_subxacts;
+ xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_twophase xl_twophase;
+
+ uint8 info;
+
+ Assert(CritSectionCount > 0);
+
+ xl_xinfo.xinfo = 0;
+
+ /* decide between a plain and 2pc abort */
+ if (!TransactionIdIsValid(twophase_xid))
+ info = XLOG_XACT_ABORT;
+ else
+ info = XLOG_XACT_ABORT_PREPARED;
+
+
+ /* First figure out and collect all the information needed */
+
+ xlrec.xact_time = abort_time;
+
+ if (nsubxacts > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+ xl_subxacts.nsubxacts = nsubxacts;
+ }
+
+ if (nrels > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES;
+ xl_relfilenodes.nrels = nrels;
+ }
+
+ if (TransactionIdIsValid(twophase_xid))
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+ xl_twophase.xid = twophase_xid;
+ }
+
+ if (xl_xinfo.xinfo != 0)
+ info |= XLOG_XACT_HAS_INFO;
+
+ /* Then include all the collected data into the abort record. */
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+ if (xl_xinfo.xinfo != 0)
+ XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+ {
+ XLogRegisterData((char *) (&xl_subxacts),
+ MinSizeOfXactSubxacts);
+ XLogRegisterData((char *) subxacts,
+ nsubxacts * sizeof(TransactionId));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES)
+ {
+ XLogRegisterData((char *) (&xl_relfilenodes),
+ MinSizeOfXactRelfilenodes);
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+
+ return XLogInsert(RM_XACT_ID, info);
+}
+
/*
* Before 9.0 this was a fairly short function, but now it performs many
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TimestampTz commit_time,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit(xl_xact_parsed_commit *parsed,
+ TransactionId xid,
+ XLogRecPtr lsn)
{
TransactionId max_xid;
int i;
- max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+ max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
/*
* Make sure nextXid is beyond any XID mentioned in the record.
}
/* Set the transaction commit timestamp and metadata */
- TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
- commit_time, InvalidCommitTsNodeId, false);
+ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
+ parsed->xact_time, InvalidCommitTsNodeId,
+ false);
if (standbyState == STANDBY_DISABLED)
{
/*
* Mark the transaction committed in pg_clog.
*/
- TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+ TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
* bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe.
*/
- TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+ TransactionIdAsyncCommitTree(
+ xid, parsed->nsubxacts, parsed->subxacts, lsn);
/*
* We must mark clog before we update the ProcArray.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
* occurs in CommitTransaction().
*/
- ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
- XactCompletionRelcacheInitFileInval(xinfo),
- dbId, tsId);
+ ProcessCommittedInvalidationMessages(
+ parsed->msgs, parsed->nmsgs,
+ XactCompletionRelcacheInitFileInval(parsed->xinfo),
+ parsed->dbId, parsed->tsId);
/*
* Release locks, if any. We do this for both two phase and normal one
}
/* Make sure files supposed to be dropped are dropped */
- if (nrels > 0)
+ if (parsed->nrels > 0)
{
/*
* First update minimum recovery point to cover this WAL record. Once
*/
XLogFlush(lsn);
- for (i = 0; i < nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
* minRecoveryPoint during recovery) helps to reduce that problem window,
* for any user that requested ForceSyncCommit().
*/
- if (XactCompletionForceSyncCommit(xinfo))
+ if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn);
}
-/*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- TransactionId *subxacts;
- SharedInvalidationMessage *inval_msgs;
-
- /* subxid array follows relfilenodes */
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- /* invalidation messages array follows subxids */
- inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- subxacts, xlrec->nsubxacts,
- inval_msgs, xlrec->nmsgs,
- xlrec->xnodes, xlrec->nrels,
- xlrec->dbId,
- xlrec->tsId,
- xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
- TransactionId xid, XLogRecPtr lsn)
-{
- xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
- xlrec->subxacts, xlrec->nsubxacts,
- NULL, 0, /* inval msgs */
- NULL, 0, /* relfilenodes */
- InvalidOid, /* dbId */
- InvalidOid, /* tsId */
- 0); /* xinfo */
-}
-
/*
* Be careful with the order of execution, as with xact_redo_commit().
* The two functions are similar but differ in key places.
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
{
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+ int i;
+ TransactionId max_xid;
/*
* Make sure nextXid is beyond any XID mentioned in the record.
* hold a lock while checking this. We still acquire the lock to modify
* it, though.
*/
+ max_xid = TransactionIdLatest(xid,
+ parsed->nsubxacts,
+ parsed->subxacts);
+
if (TransactionIdFollowsOrEquals(max_xid,
ShmemVariableCache->nextXid))
{
if (standbyState == STANDBY_DISABLED)
{
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
}
else
{
RecordKnownAssignedTransactionIds(max_xid);
/* Mark the transaction aborted in pg_clog, no need for async stuff */
- TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+ TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts);
/*
* We must update the ProcArray after we have marked clog.
*/
- ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+ ExpireTreeKnownAssignedTransactionIds(
+ xid, parsed->nsubxacts, parsed->subxacts, max_xid);
/*
* There are no flat files that need updating, nor invalidation
/*
* Release locks, if any. There are no invalidations to send.
*/
- StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+ StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
}
/* Make sure files supposed to be dropped are dropped */
- for (i = 0; i < xlrec->nrels; i++)
+ for (i = 0; i < parsed->nrels; i++)
{
- SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+ SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId);
ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++)
- XLogDropRelation(xlrec->xnodes[i], fork);
+ XLogDropRelation(parsed->xnodes[i], fork);
smgrdounlink(srel, true);
smgrclose(srel);
}
void
xact_redo(XLogReaderState *record)
{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
/* Backup blocks are not used in xact records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_XACT_COMMIT_COMPACT)
- {
- xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
- xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
- }
- else if (info == XLOG_XACT_COMMIT)
+ if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+ if (info == XLOG_XACT_COMMIT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr);
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_commit(&parsed, parsed.twophase_xid,
+ record->EndRecPtr);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
- else if (info == XLOG_XACT_ABORT)
+ else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record), xlrec,
+ &parsed);
- xact_redo_abort(xlrec, XLogRecGetXid(record));
+ if (info == XLOG_XACT_ABORT)
+ {
+ Assert(!TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, XLogRecGetXid(record));
+ }
+ else
+ {
+ Assert(TransactionIdIsValid(parsed.twophase_xid));
+ xact_redo_abort(&parsed, parsed.twophase_xid);
+ RemoveTwoPhaseFile(parsed.twophase_xid, false);
+ }
}
else if (info == XLOG_XACT_PREPARE)
{
RecreateTwoPhaseFile(XLogRecGetXid(record),
XLogRecGetData(record), XLogRecGetDataLen(record));
}
- else if (info == XLOG_XACT_COMMIT_PREPARED)
- {
- xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
-
- xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
- else if (info == XLOG_XACT_ABORT_PREPARED)
- {
- xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
-
- xact_redo_abort(&xlrec->arec, xlrec->xid);
- RemoveTwoPhaseFile(xlrec->xid, false);
- }
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
static bool
getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
- uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 xact_info = info & XLOG_XACT_OPMASK;
uint8 rmid = XLogRecGetRmid(record);
- if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
- {
- *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED))
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
- {
- *recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
- return true;
- }
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+ if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED))
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
- if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
- {
- *recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
- return true;
- }
return false;
}
recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
- uint8 record_info;
+ uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_COMMIT_PREPARED)
+ else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
isCommit = true;
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
- else if (record_info == XLOG_XACT_ABORT)
+ else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
- isCommit = false;
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ isCommit = true;
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
}
else
return false;
static bool
recoveryStopsAfter(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 info;
+ uint8 xact_info;
uint8 rmid;
TimestampTz recordXtime;
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
rmid = XLogRecGetRmid(record);
/*
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
- rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
}
}
- if (rmid == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED ||
- record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED))
+ if (rmid != RM_XACT_ID)
+ return false;
+
+ xact_info = info & XLOG_XACT_OPMASK;
+
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED ||
+ xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
TransactionId recordXid;
SetLatestXTime(recordXtime);
/* Extract the XID of the committed/aborted transaction */
- if (record_info == XLOG_XACT_COMMIT_PREPARED)
- recordXid = ((xl_xact_commit_prepared *) XLogRecGetData(record))->xid;
- else if (record_info == XLOG_XACT_ABORT_PREPARED)
- recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
+ if (xact_info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
+ else if (xact_info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
+ xl_xact_parsed_abort parsed;
+
+ ParseAbortRecord(XLogRecGetInfo(record),
+ xlrec,
+ &parsed);
+ recordXid = parsed.twophase_xid;
+ }
else
recordXid = XLogRecGetXid(record);
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
- if (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)
+ if (xact_info == XLOG_XACT_COMMIT ||
+ xact_info == XLOG_XACT_COMMIT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
- else if (record_info == XLOG_XACT_ABORT ||
- record_info == XLOG_XACT_ABORT_PREPARED)
+ else if (xact_info == XLOG_XACT_ABORT ||
+ xact_info == XLOG_XACT_ABORT_PREPARED)
{
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
static bool
recoveryApplyDelay(XLogReaderState *record)
{
- uint8 record_info;
+ uint8 xact_info;
TimestampTz xtime;
long secs;
int microsecs;
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
- record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
- if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
- (record_info == XLOG_XACT_COMMIT_COMPACT ||
- record_info == XLOG_XACT_COMMIT ||
- record_info == XLOG_XACT_COMMIT_PREPARED)))
+ if (XLogRecGetRmid(record) != RM_XACT_ID)
+ return false;
+
+ xact_info = XLogRecGetInfo(record) & XLOG_XACT_COMMIT;
+
+ if (xact_info != XLOG_XACT_COMMIT &&
+ xact_info != XLOG_XACT_COMMIT_PREPARED)
return false;
if (!getRecordTimestamp(record, &xtime))
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
- TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+ xl_xact_parsed_commit *parsed, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
XLogReaderState *r = buf->record;
- uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
switch (info)
{
case XLOG_XACT_COMMIT:
- {
- xl_xact_commit *xlrec;
- TransactionId *subxacts = NULL;
- SharedInvalidationMessage *invals = NULL;
-
- xlrec = (xl_xact_commit *) XLogRecGetData(r);
-
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
case XLOG_XACT_COMMIT_PREPARED:
{
- xl_xact_commit_prepared *prec;
xl_xact_commit *xlrec;
- TransactionId *subxacts;
- SharedInvalidationMessage *invals = NULL;
-
- /* Prepared commits contain a normal commit record... */
- prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
- xlrec = &prec->crec;
+ xl_xact_parsed_commit parsed;
+ TransactionId xid;
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
- case XLOG_XACT_COMMIT_COMPACT:
- {
- xl_xact_commit_compact *xlrec;
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
- xlrec->xact_time,
- xlrec->nsubxacts, xlrec->subxacts,
- 0, NULL);
+ DecodeCommit(ctx, buf, &parsed, xid);
break;
}
case XLOG_XACT_ABORT:
- {
- xl_xact_abort *xlrec;
- TransactionId *sub_xids;
-
- xlrec = (xl_xact_abort *) XLogRecGetData(r);
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
- DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
- sub_xids, xlrec->nsubxacts);
- break;
- }
case XLOG_XACT_ABORT_PREPARED:
{
- xl_xact_abort_prepared *prec;
xl_xact_abort *xlrec;
- TransactionId *sub_xids;
+ xl_xact_parsed_abort parsed;
+ TransactionId xid;
- /* prepared abort contain a normal commit abort... */
- prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
- xlrec = &prec->arec;
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
+ ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- /* r->xl_xid is committed in a separate record */
- DecodeAbort(ctx, buf->origptr, prec->xid,
- sub_xids, xlrec->nsubxacts);
+ DecodeAbort(ctx, buf, &parsed, xid);
break;
}
-
case XLOG_XACT_ASSIGNMENT:
{
xl_xact_assignment *xlrec;
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msgs)
+ xl_xact_parsed_commit *parsed, TransactionId xid)
{
int i;
* transaction's contents, since the various caches need to always be
* consistent.
*/
- if (ninval_msgs > 0)
+ if (parsed->nmsgs > 0)
{
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- ninval_msgs, msgs);
+ parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
- nsubxacts, sub_xids);
+ parsed->nsubxacts, parsed->subxacts);
/* ----
* Check whether we are interested in this specific transaction, and tell
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (dboid != InvalidOid && dboid != ctx->slot->data.database))
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
{
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
- sub_xids++;
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
}
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
}
/* tell the reorderbuffer about the surviving subtransactions */
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
buf->origptr, buf->endptr);
- sub_xids++;
}
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ parsed->xact_time);
}
/*
* snapbuild.c and reorderbuffer.c
*/
static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
- TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
- SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+ SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+ parsed->nsubxacts, parsed->subxacts);
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
- sub_xids++;
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
}
- ReorderBufferAbort(ctx->reorder, xid, lsn);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/*
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "storage/relfilenode.h"
+#include "storage/sinval.h"
#include "utils/datetime.h"
*/
/*
- * XLOG allows to store some information in high 4 bits of log
- * record xl_info field
+ * XLOG allows to store some information in high 4 bits of log record xl_info
+ * field. We use 3 for the opcode, and one about an optional flag variable.
*/
#define XLOG_XACT_COMMIT 0x00
#define XLOG_XACT_PREPARE 0x10
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
-#define XLOG_XACT_COMMIT_COMPACT 0x60
+/* free opcode 0x60 */
+/* free opcode 0x70 */
+
+/* mask for filtering opcodes out of xl_info */
+#define XLOG_XACT_OPMASK 0x70
+
+/* does this record have a 'xinfo' field or not */
+#define XLOG_XACT_HAS_INFO 0x80
+
+/*
+ * The following flags, stored in xinfo, determine which information is
+ * contained in commit/abort records.
+ */
+#define XACT_XINFO_HAS_DBINFO (1U << 0)
+#define XACT_XINFO_HAS_SUBXACTS (1U << 1)
+#define XACT_XINFO_HAS_RELFILENODES (1U << 2)
+#define XACT_XINFO_HAS_INVALS (1U << 3)
+#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
+
+/*
+ * Also stored in xinfo, these indicating a variety of additional actions that
+ * need to occur when emulating transaction effects during recovery.
+ *
+ * They are named XactCompletion... to differentiate them from
+ * EOXact... routines which run at the end of the original transaction
+ * completion.
+ */
+#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30)
+#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31)
+
+/* Access macros for above flags */
+#define XactCompletionRelcacheInitFileInval(xinfo) \
+ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE))
+#define XactCompletionForceSyncCommit(xinfo) \
+ (!!(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT))
typedef struct xl_xact_assignment
{
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
-typedef struct xl_xact_commit_compact
+/*
+ * Commit and abort records can contain a lot of information. But a large
+ * portion of the records won't need all possible pieces of information. So we
+ * only include what's needed.
+ *
+ * A minimal commit/abort record only consists out of a xl_xact_commit/abort
+ * struct. The presence of additional information is indicated by bits set in
+ * 'xl_xact_xinfo->xinfo'. The presence of the xinfo field itself is signalled
+ * by a set XLOG_XACT_HAS_INFO bit in the xl_info field.
+ *
+ * NB: All the individual data chunks should be be sized to multiples of
+ * sizeof(int) and only require int32 alignment.
+ */
+
+/* sub-records for commit/abort */
+
+typedef struct xl_xact_xinfo
+{
+ /*
+ * Even though we right now only require 1 byte of space in xinfo we use
+ * four so following records don't have to care about alignment. Commit
+ * records can be large, so copying large portions isn't attractive.
+ */
+ uint32 xinfo;
+} xl_xact_xinfo;
+
+typedef struct xl_xact_dbinfo
+{
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+} xl_xact_dbinfo;
+
+typedef struct xl_xact_subxacts
{
- TimestampTz xact_time; /* time of commit */
int nsubxacts; /* number of subtransaction XIDs */
- /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER];
-} xl_xact_commit_compact;
+} xl_xact_subxacts;
+#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts)
-#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+typedef struct xl_xact_relfilenodes
+{
+ int nrels; /* number of subtransaction XIDs */
+ RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_relfilenodes;
+#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
-typedef struct xl_xact_commit
+typedef struct xl_xact_invals
{
- TimestampTz xact_time; /* time of commit */
- uint32 xinfo; /* info flags */
- int nrels; /* number of RelFileNodes */
- int nsubxacts; /* number of subtransaction XIDs */
int nmsgs; /* number of shared inval msgs */
- Oid dbId; /* MyDatabaseId */
- Oid tsId; /* MyDatabaseTableSpace */
- /* Array of RelFileNode(s) to drop at commit */
- RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
- /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
- /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
-} xl_xact_commit;
+ SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_invals;
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
-#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
+typedef struct xl_xact_twophase
+{
+ TransactionId xid;
+} xl_xact_twophase;
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
-/*
- * These flags are set in the xinfo fields of WAL commit records,
- * indicating a variety of additional actions that need to occur
- * when emulating transaction effects during recovery.
- * They are named XactCompletion... to differentiate them from
- * EOXact... routines which run at the end of the original
- * transaction completion.
- */
-#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
-#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
+typedef struct xl_xact_commit
+{
+ TimestampTz xact_time; /* time of commit */
-/* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+ /* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
+ /* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
+ /* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
+ /* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+ /* xl_xact_invals follows if XINFO_HAS_INVALS */
+ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+} xl_xact_commit;
+#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
typedef struct xl_xact_abort
{
TimestampTz xact_time; /* time of abort */
- int nrels; /* number of RelFileNodes */
- int nsubxacts; /* number of subtransaction XIDs */
- /* Array of RelFileNode(s) to drop at abort */
- RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
- /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
-} xl_xact_abort;
-/* Note the intentional lack of an invalidation message array c.f. commit */
-
-#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
+ /* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
+ /* No db_info required */
+ /* xl_xact_subxacts follows if HAS_SUBXACT */
+ /* xl_xact_relfilenodes follows if HAS_RELFILENODES */
+ /* No invalidation messages needed. */
+ /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+} xl_xact_abort;
+#define MinSizeOfXactAbort sizeof(xl_xact_abort)
/*
- * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
- * except that we have to store the XID of the prepared transaction explicitly
- * --- the XID in the record header will be invalid.
+ * Commit/Abort records in the above form are a bit verbose to parse, so
+ * there's a deconstructed versions generated by ParseCommit/AbortRecord() for
+ * easier consumption.
*/
-
-typedef struct xl_xact_commit_prepared
+typedef struct xl_xact_parsed_commit
{
- TransactionId xid; /* XID of prepared xact */
- xl_xact_commit crec; /* COMMIT record */
- /* MORE DATA FOLLOWS AT END OF STRUCT */
-} xl_xact_commit_prepared;
+ TimestampTz xact_time;
+
+ uint32 xinfo;
+
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
-#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
+ int nsubxacts;
+ TransactionId *subxacts;
-typedef struct xl_xact_abort_prepared
+ int nrels;
+ RelFileNode *xnodes;
+
+ int nmsgs;
+ SharedInvalidationMessage *msgs;
+
+ TransactionId twophase_xid; /* only for 2PC */
+} xl_xact_parsed_commit;
+
+typedef struct xl_xact_parsed_abort
{
- TransactionId xid; /* XID of prepared xact */
- xl_xact_abort arec; /* ABORT record */
- /* MORE DATA FOLLOWS AT END OF STRUCT */
-} xl_xact_abort_prepared;
+ TimestampTz xact_time;
+ uint32 xinfo;
+
+ int nsubxacts;
+ TransactionId *subxacts;
-#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
+ int nrels;
+ RelFileNode *xnodes;
+
+ TransactionId twophase_xid; /* only for 2PC */
+} xl_xact_parsed_abort;
/* ----------------
extern int xactGetCommittedChildren(TransactionId **ptr);
+extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInval, bool forceSync,
+ TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
+ int nsubxacts, TransactionId *subxacts,
+ int nrels, RelFileNode *rels,
+ TransactionId twophase_xid);
extern void xact_redo(XLogReaderState *record);
+
+/* xactdesc.c */
extern void xact_desc(StringInfo buf, XLogReaderState *record);
extern const char *xact_identify(uint8 info);
+/* also in xactdesc.c, so they can be shared between front/backend code */
+extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
+extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+
#endif /* XACT_H */
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD082 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{