Read WAL directly from WAL buffers.
authorJeff Davis <jdavis@postgresql.org>
Mon, 12 Feb 2024 18:36:18 +0000 (10:36 -0800)
committerJeff Davis <jdavis@postgresql.org>
Mon, 12 Feb 2024 19:11:22 +0000 (11:11 -0800)
If available, read directly from WAL buffers, avoiding the need to go
through the filesystem. Only for physical replication for now, but can
be expanded to other callers.

In preparation for replicating unflushed WAL data.

Author: Bharath Rupireddy
Discussion: https://postgr.es/m/CALj2ACXKKK%3DwbiG5_t6dGao5GoecMwRkhr7GjVBM_jg54%2BNa%3DQ%40mail.gmail.com
Reviewed-by: Andres Freund, Alvaro Herrera, Nathan Bossart, Dilip Kumar, Nitin Jadhav, Melih Mutlu, Kyotaro Horiguchi
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogreader.c
src/backend/replication/walsender.c
src/include/access/xlog.h

index 478377c4a239e5ff2c61b0af16190bc692c8621e..4e14c242b15fe6d3a3cb5020ec8db882a79474bd 100644 (file)
@@ -1705,6 +1705,126 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
    return cachedPos + ptr % XLOG_BLCKSZ;
 }
 
+/*
+ * Read WAL data directly from WAL buffers, if available. Returns the number
+ * of bytes read successfully.
+ *
+ * Fewer than 'count' bytes may be read if some of the requested WAL data has
+ * already been evicted from the WAL buffers, or if the caller requests data
+ * that is not yet available.
+ *
+ * No locks are taken.
+ *
+ * The 'tli' argument is only used as a convenient safety check so that
+ * callers do not read from WAL buffers on a historical timeline.
+ */
+Size
+WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
+                  TimeLineID tli)
+{
+   char       *pdst = dstbuf;
+   XLogRecPtr  recptr = startptr;
+   XLogRecPtr  upto;
+   Size        nbytes;
+
+   if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
+       return 0;
+
+   Assert(!XLogRecPtrIsInvalid(startptr));
+
+   /*
+    * Don't read past the available WAL data.
+    *
+    * Check using local copy of LogwrtResult. Ordinarily it's been updated by
+    * the caller when determining how far to read; but if not, it just means
+    * we'll read less data.
+    *
+    * XXX: the available WAL could be extended to the WAL insert pointer by
+    * calling WaitXLogInsertionsToFinish().
+    */
+   upto = Min(startptr + count, LogwrtResult.Write);
+   nbytes = upto - startptr;
+
+   /*
+    * Loop through the buffers without a lock. For each buffer, atomically
+    * read and verify the end pointer, then copy the data out, and finally
+    * re-read and re-verify the end pointer.
+    *
+    * Once a page is evicted, it never returns to the WAL buffers, so if the
+    * end pointer matches the expected end pointer before and after we copy
+    * the data, then the right page must have been present during the data
+    * copy. Read barriers are necessary to ensure that the data copy actually
+    * happens between the two verification steps.
+    *
+    * If either verification fails, we simply terminate the loop and return
+    * with the data that had been already copied out successfully.
+    */
+   while (nbytes > 0)
+   {
+       uint32      offset = recptr % XLOG_BLCKSZ;
+       int         idx = XLogRecPtrToBufIdx(recptr);
+       XLogRecPtr  expectedEndPtr;
+       XLogRecPtr  endptr;
+       const char *page;
+       const char *psrc;
+       Size        npagebytes;
+
+       /*
+        * Calculate the end pointer we expect in the xlblocks array if the
+        * correct page is present.
+        */
+       expectedEndPtr = recptr + (XLOG_BLCKSZ - offset);
+
+       /*
+        * First verification step: check that the correct page is present in
+        * the WAL buffers.
+        */
+       endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+       if (expectedEndPtr != endptr)
+           break;
+
+       /*
+        * The correct page is present (or was at the time the endptr was
+        * read; must re-verify later). Calculate pointer to source data and
+        * determine how much data to read from this page.
+        */
+       page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+       psrc = page + offset;
+       npagebytes = Min(nbytes, XLOG_BLCKSZ - offset);
+
+       /*
+        * Ensure that the data copy and the first verification step are not
+        * reordered.
+        */
+       pg_read_barrier();
+
+       /* data copy */
+       memcpy(pdst, psrc, npagebytes);
+
+       /*
+        * Ensure that the data copy and the second verification step are not
+        * reordered.
+        */
+       pg_read_barrier();
+
+       /*
+        * Second verification step: check that the page we read from wasn't
+        * evicted while we were copying the data.
+        */
+       endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
+       if (expectedEndPtr != endptr)
+           break;
+
+       pdst += npagebytes;
+       recptr += npagebytes;
+       nbytes -= npagebytes;
+   }
+
+   Assert(pdst - dstbuf <= count);
+
+   return pdst - dstbuf;
+}
+
 /*
  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
  * is the position starting from the beginning of WAL, excluding all WAL
index 7190156f2fe2b17628174171bc698d4f724035cd..74a6b1186699583b99173a1b4ad300fdd33be1c9 100644 (file)
@@ -1500,9 +1500,6 @@ err:
  *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
  */
 bool
 WALRead(XLogReaderState *state,
index 77c8baa32a4c08c3370854ee7870a6c080895812..146826d5db936598a3c91de18b4360d71e57c46b 100644 (file)
@@ -2966,6 +2966,7 @@ XLogSendPhysical(void)
    Size        nbytes;
    XLogSegNo   segno;
    WALReadError errinfo;
+   Size        rbytes;
 
    /* If requested switch the WAL sender to the stopping state. */
    if (got_STOPPING)
@@ -3181,7 +3182,16 @@ XLogSendPhysical(void)
    enlargeStringInfo(&output_message, nbytes);
 
 retry:
-   if (!WALRead(xlogreader,
+   /* attempt to read WAL from WAL buffers first */
+   rbytes = WALReadFromBuffers(&output_message.data[output_message.len],
+                               startptr, nbytes, xlogreader->seg.ws_tli);
+   output_message.len += rbytes;
+   startptr += rbytes;
+   nbytes -= rbytes;
+
+   /* now read the remaining WAL from WAL file */
+   if (nbytes > 0 &&
+       !WALRead(xlogreader,
                 &output_message.data[output_message.len],
                 startptr,
                 nbytes,
index 301c5fa11fb0182b7578f1de88c9d405e585684f..76787a8267349f693c72167fcbbb0d075b3fa35d 100644 (file)
@@ -252,6 +252,9 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
+                              TimeLineID tli);
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */