gistxlogDelete *xldata = (gistxlogDelete *) XLogRecGetData(record);
Buffer buffer;
Page page;
+ OffsetNumber *toDelete = xldata->offsets;
/*
* If we have any conflict processing to do, it must happen before we
{
page = (Page) BufferGetPage(buffer);
- if (XLogRecGetDataLen(record) > SizeOfGistxlogDelete)
- {
- OffsetNumber *todelete;
-
- todelete = (OffsetNumber *) ((char *) xldata + SizeOfGistxlogDelete);
-
- PageIndexMultiDelete(page, todelete, xldata->ntodelete);
- }
+ PageIndexMultiDelete(page, toDelete, xldata->ntodelete);
GistClearPageHasGarbage(page);
GistMarkTuplesDeleted(page);
*/
/* XLOG stuff */
+ xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
xlrec_reuse.locator = rel->rd_locator;
xlrec_reuse.block = blkno;
xlrec_reuse.snapshotConflictHorizon = deleteXid;
gistxlogDelete xlrec;
XLogRecPtr recptr;
+ xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec.ntodelete = ntodelete;
Page page;
XLogRedoAction action;
HashPageOpaque pageopaque;
+ OffsetNumber *toDelete;
xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+ toDelete = xldata->offsets;
/*
* If we have any conflict processing to do, it must happen before we
{
page = (Page) BufferGetPage(buffer);
- if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
- {
- OffsetNumber *unused;
-
- unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
-
- PageIndexMultiDelete(page, unused, xldata->ntuples);
- }
+ PageIndexMultiDelete(page, toDelete, xldata->ntuples);
/*
* Mark the page as not containing any LP_DEAD items. See comments in
xl_hash_vacuum_one_page xlrec;
XLogRecPtr recptr;
+ xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(hrel);
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec.ntuples = ndeletable;
nplans = heap_log_freeze_plan(tuples, ntuples, plans, offsets);
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
+ xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
xlrec.nplans = nplans;
XLogBeginInsert();
xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec.flags = vmflags;
+ if (RelationIsAccessibleInLogicalDecoding(rel))
+ xlrec.flags |= VISIBILITYMAP_XLOG_CATALOG_REL;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
BlockNumber blkno;
XLogRedoAction action;
+ Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
+
XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
/*
{
Page vmpage = BufferGetPage(vmbuffer);
Relation reln;
+ uint8 vmbits;
/* initialize the page if it was read as zeros */
if (PageIsNew(vmpage))
PageInit(vmpage, BLCKSZ, 0);
+ /* remove VISIBILITYMAP_XLOG_* */
+ vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
+
/*
* XLogReadBufferForRedoExtended locked the buffer. But
* visibilitymap_set will handle locking itself.
visibilitymap_pin(reln, blkno, &vmbuffer);
visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
- xlrec->snapshotConflictHorizon, xlrec->flags);
+ xlrec->snapshotConflictHorizon, vmbits);
ReleaseBuffer(vmbuffer);
FreeFakeRelcacheEntry(reln);
xl_heap_prune xlrec;
XLogRecPtr recptr;
+ xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
xlrec.nredirected = prstate.nredirected;
xlrec.ndead = prstate.ndead;
*/
/* XLOG stuff */
+ xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
xlrec_reuse.locator = rel->rd_locator;
xlrec_reuse.block = blkno;
xlrec_reuse.snapshotConflictHorizon = safexid;
XLogRecPtr recptr;
xl_btree_delete xlrec_delete;
+ xlrec_delete.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
xlrec_delete.ndeleted = ndeletable;
xlrec_delete.nupdated = nupdatable;
spgxlogVacuumRedirect xlrec;
GlobalVisState *vistest;
+ xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
xlrec.nToPlaceholder = 0;
xlrec.snapshotConflictHorizon = InvalidTransactionId;
{
TransactionId snapshotConflictHorizon;
uint16 ntodelete; /* number of deleted offsets */
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
- /* TODELETE OFFSET NUMBER ARRAY FOLLOWS */
+ /* TODELETE OFFSET NUMBERS */
+ OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
} gistxlogDelete;
-#define SizeOfGistxlogDelete (offsetof(gistxlogDelete, ntodelete) + sizeof(uint16))
+#define SizeOfGistxlogDelete offsetof(gistxlogDelete, offsets)
/*
* Backup Blk 0: If this operation completes a page split, by inserting a
RelFileLocator locator;
BlockNumber block;
FullTransactionId snapshotConflictHorizon;
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
} gistxlogPageReuse;
-#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse, snapshotConflictHorizon) + sizeof(FullTransactionId))
+#define SizeOfGistxlogPageReuse (offsetof(gistxlogPageReuse, isCatalogRel) + sizeof(bool))
extern void gist_redo(XLogReaderState *record);
extern void gist_desc(StringInfo buf, XLogReaderState *record);
typedef struct xl_hash_vacuum_one_page
{
TransactionId snapshotConflictHorizon;
- uint16 ntuples;
+ uint16 ntuples;
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
- /* TARGET OFFSET NUMBERS FOLLOW AT THE END */
+ /* TARGET OFFSET NUMBERS */
+ OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
} xl_hash_vacuum_one_page;
-#define SizeOfHashVacuumOnePage \
- (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(uint16))
+#define SizeOfHashVacuumOnePage offsetof(xl_hash_vacuum_one_page, offsets)
extern void hash_redo(XLogReaderState *record);
extern void hash_desc(StringInfo buf, XLogReaderState *record);
TransactionId snapshotConflictHorizon;
uint16 nredirected;
uint16 ndead;
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
/* OFFSET NUMBERS are in the block reference 0 */
} xl_heap_prune;
-#define SizeOfHeapPrune (offsetof(xl_heap_prune, ndead) + sizeof(uint16))
+#define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
/*
* The vacuum page record is similar to the prune record, but can only mark
{
TransactionId snapshotConflictHorizon;
uint16 nplans;
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
/*
* In payload of blk 0 : FREEZE PLANS and OFFSET NUMBER ARRAY
*/
} xl_heap_freeze_page;
-#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, nplans) + sizeof(uint16))
+#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, isCatalogRel) + sizeof(bool))
/*
* This is what we need to know about setting a visibility map bit
RelFileLocator locator;
BlockNumber block;
FullTransactionId snapshotConflictHorizon;
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
} xl_btree_reuse_page;
-#define SizeOfBtreeReusePage (sizeof(xl_btree_reuse_page))
+#define SizeOfBtreeReusePage (offsetof(xl_btree_reuse_page, isCatalogRel) + sizeof(bool))
/*
* xl_btree_vacuum and xl_btree_delete records describe deletion of index
TransactionId snapshotConflictHorizon;
uint16 ndeleted;
uint16 nupdated;
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
/*----
* In payload of blk 0 :
*/
} xl_btree_delete;
-#define SizeOfBtreeDelete (offsetof(xl_btree_delete, nupdated) + sizeof(uint16))
+#define SizeOfBtreeDelete (offsetof(xl_btree_delete, isCatalogRel) + sizeof(bool))
/*
* The offsets that appear in xl_btree_update metadata are offsets into the
uint16 nToPlaceholder; /* number of redirects to make placeholders */
OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */
TransactionId snapshotConflictHorizon; /* newest XID of removed redirects */
+ bool isCatalogRel; /* to handle recovery conflict during logical
+ * decoding on standby */
/* offsets of redirect tuples to make placeholders follow */
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
#define VISIBILITYMAP_ALL_FROZEN 0x02
#define VISIBILITYMAP_VALID_BITS 0x03 /* OR of all valid visibilitymap
* flags bits */
+/*
+ * To detect recovery conflicts during logical decoding on a standby, we need
+ * to know if a table is a user catalog table. For that we add an additional
+ * bit into xl_heap_visible.flags, in addition to the above.
+ *
+ * NB: VISIBILITYMAP_XLOG_* may not be passed to visibilitymap_set().
+ */
+#define VISIBILITYMAP_XLOG_CATALOG_REL 0x04
+#define VISIBILITYMAP_XLOG_VALID_BITS (VISIBILITYMAP_VALID_BITS | VISIBILITYMAP_XLOG_CATALOG_REL)
#endif /* VISIBILITYMAPDEFS_H */
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD112 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD113 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
#include "access/tupdesc.h"
#include "access/xlog.h"
+#include "catalog/catalog.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "catalog/pg_publication.h"