Back-patch log_newpage_range().
authorNoah Misch <noah@leadboat.com>
Sat, 21 Mar 2020 16:38:33 +0000 (09:38 -0700)
committerNoah Misch <noah@leadboat.com>
Sat, 21 Mar 2020 16:38:33 +0000 (09:38 -0700)
Back-patch a subset of commit 9155580fd5fc2a0cbb23376dfca7cd21f59c2c7b
to v11, v10, 9.6, and 9.5.  Include the latest repairs to this function.
Use a new XLOG_FPI_MULTI value instead of reusing XLOG_FPI.  That way,
if an older server reads WAL from this function, that server will PANIC
instead of applying just one page of the record.  The next commit adds a
call to this function.

Discussion: https://postgr.es/m/20200304.162919.898938381201316571.horikyota.ntt@gmail.com

src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xloginsert.c
src/backend/replication/logical/decode.c
src/include/access/xloginsert.h
src/include/catalog/pg_control.h

index 00741c7b09ecd1a68a348f7913cb16bc6e6c3654..7cbce3783b1c3fb2ad22e0db57d68db2688c082f 100644 (file)
@@ -78,7 +78,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 
        appendStringInfoString(buf, xlrec->rp_name);
    }
-   else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
+   else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
+            info == XLOG_FPI_MULTI)
    {
        /* no further information to print */
    }
@@ -182,6 +183,9 @@ xlog_identify(uint8 info)
        case XLOG_FPI_FOR_HINT:
            id = "FPI_FOR_HINT";
            break;
+       case XLOG_FPI_MULTI:
+           id = "FPI_MULTI";
+           break;
    }
 
    return id;
index 40eb77425e8e1ceed92529788500ec8b92497377..62f87ed1741d76f13673917ac8c65498f4644015 100644 (file)
@@ -9754,7 +9754,7 @@ xlog_redo(XLogReaderState *record)
 
    /* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */
    Assert(info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
-          !XLogRecHasAnyBlockRefs(record));
+          info == XLOG_FPI_MULTI || !XLogRecHasAnyBlockRefs(record));
 
    if (info == XLOG_NEXTOID)
    {
@@ -9957,14 +9957,16 @@ xlog_redo(XLogReaderState *record)
    {
        /* nothing to do here */
    }
-   else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
+   else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
+            info == XLOG_FPI_MULTI)
    {
-       Buffer      buffer;
+       uint8       block_id;
 
        /*
         * Full-page image (FPI) records contain nothing else but a backup
-        * block. The block reference must include a full-page image -
-        * otherwise there would be no point in this record.
+        * block (or multiple backup blocks). Every block reference must
+        * include a full-page image - otherwise there would be no point in
+        * this record.
         *
         * No recovery conflicts are generated by these generic records - if a
         * resource manager needs to generate conflicts, it has to define a
@@ -9976,9 +9978,14 @@ xlog_redo(XLogReaderState *record)
         * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
         * code just to distinguish them for statistics purposes.
         */
-       if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
-           elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
-       UnlockReleaseBuffer(buffer);
+       for (block_id = 0; block_id <= record->max_block_id; block_id++)
+       {
+           Buffer      buffer;
+
+           if (XLogReadBufferForRedo(record, block_id, &buffer) != BLK_RESTORED)
+               elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
+           UnlockReleaseBuffer(buffer);
+       }
    }
    else if (info == XLOG_BACKUP_END)
    {
index 34d4db42977e74f3d55987828696503d31e99b3d..c033e7bd4cf21b4e95249194857d18bab5664b24 100644 (file)
@@ -1021,6 +1021,94 @@ log_newpage_buffer(Buffer buffer, bool page_std)
    return log_newpage(&rnode, forkNum, blkno, page, page_std);
 }
 
+/*
+ * WAL-log a range of blocks in a relation.
+ *
+ * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
+ * written to the WAL. If the range is large, this is done in multiple WAL
+ * records.
+ *
+ * If all page follows the standard page layout, with a PageHeader and unused
+ * space between pd_lower and pd_upper, set 'page_std' to true. That allows
+ * the unused space to be left out from the WAL records, making them smaller.
+ *
+ * NOTE: This function acquires exclusive-locks on the pages. Typically, this
+ * is used on a newly-built relation, and the caller is holding a
+ * AccessExclusiveLock on it, so no other backend can be accessing it at the
+ * same time. If that's not the case, you must ensure that this does not
+ * cause a deadlock through some other means.
+ */
+void
+log_newpage_range(Relation rel, ForkNumber forkNum,
+                 BlockNumber startblk, BlockNumber endblk,
+                 bool page_std)
+{
+   int         flags;
+   BlockNumber blkno;
+
+   flags = REGBUF_FORCE_IMAGE;
+   if (page_std)
+       flags |= REGBUF_STANDARD;
+
+   /*
+    * Iterate over all the pages in the range. They are collected into
+    * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
+    * for each batch.
+    */
+   XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
+
+   blkno = startblk;
+   while (blkno < endblk)
+   {
+       Buffer      bufpack[XLR_MAX_BLOCK_ID];
+       XLogRecPtr  recptr;
+       int         nbufs;
+       int         i;
+
+       CHECK_FOR_INTERRUPTS();
+
+       /* Collect a batch of blocks. */
+       nbufs = 0;
+       while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
+       {
+           Buffer      buf = ReadBufferExtended(rel, forkNum, blkno,
+                                                RBM_NORMAL, NULL);
+
+           LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+           /*
+            * Completely empty pages are not WAL-logged. Writing a WAL record
+            * would change the LSN, and we don't want that. We want the page
+            * to stay empty.
+            */
+           if (!PageIsNew(BufferGetPage(buf)))
+               bufpack[nbufs++] = buf;
+           else
+               UnlockReleaseBuffer(buf);
+           blkno++;
+       }
+
+       /* Write WAL record for this batch. */
+       XLogBeginInsert();
+
+       START_CRIT_SECTION();
+       for (i = 0; i < nbufs; i++)
+       {
+           XLogRegisterBuffer(i, bufpack[i], flags);
+           MarkBufferDirty(bufpack[i]);
+       }
+
+       recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_MULTI);
+
+       for (i = 0; i < nbufs; i++)
+       {
+           PageSetLSN(BufferGetPage(bufpack[i]), recptr);
+           UnlockReleaseBuffer(bufpack[i]);
+       }
+       END_CRIT_SECTION();
+   }
+}
+
 /*
  * Allocate working buffers needed for WAL record construction.
  */
index caa6edda18e982064604547205e0b45e714148e7..d7da3d6c948df4902e353bdf208d5b23fb436d5d 100644 (file)
@@ -199,6 +199,7 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        case XLOG_FPW_CHANGE:
        case XLOG_FPI_FOR_HINT:
        case XLOG_FPI:
+       case XLOG_FPI_MULTI:
            break;
        default:
            elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
index fa62f915afcf2313e894dba63678ccd0bee1ca13..d8e20109be8e1d93c96bfed13aea41f67b07c280 100644 (file)
@@ -16,6 +16,7 @@
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/relfilenode.h"
+#include "utils/relcache.h"
 
 /*
  * The minimum size of the WAL construction working area. If you need to
@@ -54,6 +55,8 @@ extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
 extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
            BlockNumber blk, char *page, bool page_std);
 extern XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std);
+extern void log_newpage_range(Relation rel, ForkNumber forkNum,
+                 BlockNumber startblk, BlockNumber endblk, bool page_std);
 extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
 
 extern void InitXLogInsert(void);
index 773d9e6ebae967829f1a651bef17730b3984f149..358430249f6b59a126c5ace55e20c1b373d0969d 100644 (file)
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY           0x90
 #define XLOG_FPI_FOR_HINT              0xA0
 #define XLOG_FPI                       0xB0
+#define XLOG_FPI_MULTI                 0xC0
 
 
 /*