Add a new option to RestoreBkpBlocks() to indicate if a cleanup lock should
authorHeikki Linnakangas <heikki@enterprisedb.com>
Tue, 20 Jan 2009 18:59:37 +0000 (18:59 +0000)
committerHeikki Linnakangas <heikki@enterprisedb.com>
Tue, 20 Jan 2009 18:59:37 +0000 (18:59 +0000)
be used instead of the normal exclusive lock, and make WAL redo functions
responsible for calling RestoreBkpBlocks(). They know better what kind of a
lock they need.

At the moment, this just moves things around with no functional change, but
makes the hot standby patch that's under review cleaner.

15 files changed:
src/backend/access/gin/ginxlog.c
src/backend/access/gist/gistxlog.c
src/backend/access/heap/heapam.c
src/backend/access/nbtree/nbtxlog.c
src/backend/access/transam/clog.c
src/backend/access/transam/multixact.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogutils.c
src/backend/catalog/storage.c
src/backend/commands/dbcommands.c
src/backend/commands/sequence.c
src/backend/commands/tablespace.c
src/backend/storage/freespace/freespace.c
src/include/access/xlog.h

index d2cff392e57316f459adfb0c68e92ec923c441d3..8382576d3c07879f1b9312b44b3eb31d59911886 100644 (file)
@@ -438,6 +438,8 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       RestoreBkpBlocks(lsn, record, false);
+
        topCtx = MemoryContextSwitchTo(opCtx);
        switch (info)
        {
index 116449e3c833bdfc2b6f634b3a1a5f46bbd9f6aa..4a20d905d4e4bb70c404444ec254bac8eeea5128 100644 (file)
@@ -394,9 +394,10 @@ void
 gist_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
-
        MemoryContext oldCxt;
 
+       RestoreBkpBlocks(lsn, record, false);
+
        oldCxt = MemoryContextSwitchTo(opCtx);
        switch (info)
        {
index 4af17c80795031cbf323c3b134bf9ea5d2941dfa..52115cf64e1832679b4762ed4e025ba3be29d1ea 100644 (file)
@@ -4777,6 +4777,8 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       RestoreBkpBlocks(lsn, record, false);
+
        switch (info & XLOG_HEAP_OPMASK)
        {
                case XLOG_HEAP_INSERT:
@@ -4816,12 +4818,15 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
        switch (info & XLOG_HEAP_OPMASK)
        {
                case XLOG_HEAP2_FREEZE:
+                       RestoreBkpBlocks(lsn, record, false);
                        heap_xlog_freeze(lsn, record);
                        break;
                case XLOG_HEAP2_CLEAN:
+                       RestoreBkpBlocks(lsn, record, true);
                        heap_xlog_clean(lsn, record, false);
                        break;
                case XLOG_HEAP2_CLEAN_MOVE:
+                       RestoreBkpBlocks(lsn, record, true);
                        heap_xlog_clean(lsn, record, true);
                        break;
                default:
index a874e491f5096a186e3bac3ab3175c71263f08b7..517c4b90cec9b9c9b9f5604a33108abf1c0e416a 100644 (file)
@@ -714,6 +714,8 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       RestoreBkpBlocks(lsn, record, false);
+
        switch (info)
        {
                case XLOG_BTREE_INSERT_LEAF:
index c80867c4727a4029ed1e0ae0bb46a0c3d2896251..528a219db47444eb331423b9374bd6d000850cde 100644 (file)
@@ -684,6 +684,9 @@ clog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in clog records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == CLOG_ZEROPAGE)
        {
                int                     pageno;
index 78d47154c9b7a667379bc8b875bbea6724a0b7d2..73143411017cf4d9ecff96f39084e807d7448267 100644 (file)
@@ -1870,6 +1870,9 @@ multixact_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in multixact records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE)
        {
                int                     pageno;
index 23f6ea0a8b5d8b333421fa11468cc211e96f9abc..c94e2a2251fafdcf642dbbb90a00a44e863054ad 100644 (file)
@@ -4308,6 +4308,9 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in xact records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_XACT_COMMIT)
        {
                xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
index 7debb34072601c4606a3842535251ce6f623dd22..bd6035d4a692ac8d15d45498598bb203b9f32790 100644 (file)
@@ -2922,9 +2922,15 @@ CleanupBackupHistory(void)
  * page might not be.  This will force us to replay all subsequent
  * modifications of the page that appear in XLOG, rather than possibly
  * ignoring them as already applied, but that's not a huge drawback.
+ *
+ * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
+ * Otherwise, a normal exclusive lock is used.  At the moment, that's just
+ * pro forma, because there can't be any regular backends in the system
+ * during recovery.  The 'cleanup' argument applies to all backup blocks
+ * in the WAL record, that suffices for now.
  */
-static void
-RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
+void
+RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
 {
        Buffer          buffer;
        Page            page;
@@ -2944,6 +2950,11 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
                buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
                                                                                RBM_ZERO);
                Assert(BufferIsValid(buffer));
+               if (cleanup)
+                       LockBufferForCleanup(buffer);
+               else
+                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
                page = (Page) BufferGetPage(buffer);
 
                if (bkpb.hole_length == 0)
@@ -5199,9 +5210,6 @@ StartupXLOG(void)
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
                                }
 
-                               if (record->xl_info & XLR_BKP_BLOCK_MASK)
-                                       RestoreBkpBlocks(record, EndRecPtr);
-
                                RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
 
                                /* Pop the error context stack */
@@ -6233,6 +6241,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in xlog records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_NEXTOID)
        {
                Oid                     nextOid;
index ef5fe4cef7f0e8c405b258cb134b34c1bd9ca6ef..4a86271019d4b1d56e4fe3f8f6d6b90b577a3e7d 100644 (file)
@@ -217,8 +217,19 @@ XLogCheckInvalidPages(void)
 
 /*
  * XLogReadBuffer
- *             A shorthand of XLogReadBufferExtended(), for reading from the main
- *             fork.
+ *             Read a page during XLOG replay.
+ *
+ * This is a shorthand of XLogReadBufferExtended() followed by
+ * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), for reading from the main
+ * fork.
+ *
+ * (Getting the lock is not really necessary, since we expect that this is
+ * only used during single-process XLOG replay, but some subroutines such
+ * as MarkBufferDirty will complain if we don't. And hopefully we'll get
+ * hot standby support in the future, where there will be backends running
+ * read-only queries during XLOG replay.)
+ *
+ * The returned buffer is exclusively-locked.
  *
  * For historical reasons, instead of a ReadBufferMode argument, this only
  * supports RBM_ZERO (init == true) and RBM_NORMAL (init == false) modes.
@@ -226,22 +237,21 @@ XLogCheckInvalidPages(void)
 Buffer
 XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
 {
-       return XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
-                                                                 init ? RBM_ZERO : RBM_NORMAL);
+       Buffer buf;
+       buf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+                                                                init ? RBM_ZERO : RBM_NORMAL);
+       if (BufferIsValid(buf))
+               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+       return buf;
 }
 
 /*
  * XLogReadBufferExtended
  *             Read a page during XLOG replay
  *
- * This is functionally comparable to ReadBuffer followed by
- * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
- * and locked buffer.  (Getting the lock is not really necessary, since we
- * expect that this is only used during single-process XLOG replay, but
- * some subroutines such as MarkBufferDirty will complain if we don't.)
- *
- * There's some differences in the behavior wrt. the "mode" argument,
- * compared to ReadBufferExtended:
+ * This is functionally comparable to ReadBufferExtended. There's some
+ * differences in the behavior wrt. the "mode" argument:
  *
  * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
  * return InvalidBuffer. In this case the caller should silently skip the
@@ -306,16 +316,19 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
                Assert(BufferGetBlockNumber(buffer) == blkno);
        }
 
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
        if (mode == RBM_NORMAL)
        {
                /* check that page has been initialized */
                Page            page = (Page) BufferGetPage(buffer);
 
+               /*
+                * We assume that PageIsNew is safe without a lock. During recovery,
+                * there should be no other backends that could modify the buffer at
+                * the same time.
+                */
                if (PageIsNew(page))
                {
-                       UnlockReleaseBuffer(buffer);
+                       ReleaseBuffer(buffer);
                        log_invalid_page(rnode, forknum, blkno, true);
                        return InvalidBuffer;
                }
index e9008e3f2cc78eabcd373225c0d7d395c3117d37..309fa469adf7fcfdbee9c904b930435e25f313a0 100644 (file)
@@ -401,6 +401,9 @@ smgr_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in smgr records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_SMGR_CREATE)
        {
                xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(record);
index 50cb6780d53a8b84be495257f85d0034b9dd6ec6..7e065762a84856679e5e39609950ebed18251b44 100644 (file)
@@ -1941,6 +1941,9 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in dbase records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_DBASE_CREATE)
        {
                xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
index c80f2a669aed612f720668db06cd89e7f43809c5..46d76833771d7ff64c2774adadd14081998157ff 100644 (file)
@@ -1339,6 +1339,9 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
        xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
        sequence_magic *sm;
 
+       /* Backup blocks are not used in seq records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info != XLOG_SEQ_LOG)
                elog(PANIC, "seq_redo: unknown op code %u", info);
 
index 7fa4bfba62d0884b660cb0dd81f6f12726f8b0a2..ee0c6428ef5485b56ae796e5bbe464adcff6a9f7 100644 (file)
@@ -1276,6 +1276,9 @@ tblspc_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in tblspc records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_TBLSPC_CREATE)
        {
                xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
index 8ef11be652a129d4cf8724a979497f1745cec15d..7fcd51e63863afa6e41adfad9f804e612fb4f8f4 100644 (file)
@@ -212,6 +212,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
 
        /* If the page doesn't exist already, extend */
        buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
+       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
        page = BufferGetPage(buf);
        if (PageIsNew(page))
                PageInit(page, BLCKSZ, 0);
index 5b7374ee667f296437d78819b44926487e8ef912..6913f7c800267f2caeb5438c083042176ac8b81b 100644 (file)
@@ -194,6 +194,8 @@ extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
 
 extern void XLogSetAsyncCommitLSN(XLogRecPtr record);
 
+extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup);
+
 extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
 extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);