Get rid of WALBufMappingLock
authorAlexander Korotkov <akorotkov@postgresql.org>
Wed, 2 Apr 2025 09:44:24 +0000 (12:44 +0300)
committerAlexander Korotkov <akorotkov@postgresql.org>
Wed, 2 Apr 2025 09:44:24 +0000 (12:44 +0300)
Allow multiple backends to initialize WAL buffers concurrently.  This way
`MemSet((char *) NewPage, 0, XLOG_BLCKSZ);` can run in parallel without
taking a single LWLock in exclusive mode.

The new algorithm works as follows:
 * reserve a page for initialization using XLogCtl->InitializeReserved,
 * ensure the page is written out,
 * once the page is initialized, try to advance XLogCtl->InitializedUpTo and
   signal to waiters using XLogCtl->InitializedUpToCondVar condition
   variable,
 * repeat previous steps until we reserve initialization up to the target
   WAL position,
 * wait until concurrent initialization finishes using a
   XLogCtl->InitializedUpToCondVar.

Now, multiple backends can, in parallel, concurrently reserve pages,
initialize them, and advance XLogCtl->InitializedUpTo to point to the latest
initialized page.

Author: Yura Sokolov <y.sokolov@postgrespro.ru>
Co-authored-by: Alexander Korotkov <aekorotkov@gmail.com>
Reviewed-by: Pavel Borisov <pashkin.elfe@gmail.com>
Reviewed-by: Tomas Vondra <tomas@vondra.me>
Tested-by: Michael Paquier <michael@paquier.xyz>
src/backend/access/transam/xlog.c
src/backend/utils/activity/wait_event_names.txt
src/include/storage/lwlocklist.h

index fc30a52d496aa266a82f556709f01ec044f05c80..ec40c0b7c42b1deb16c38ef2afd253b95baf742f 100644 (file)
@@ -302,11 +302,6 @@ static bool doPageWrites;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
- * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
- * It is only held while initializing and changing the mapping.  If the
- * contents of the buffer being replaced haven't been written yet, the mapping
- * lock is released while the write is done, and reacquired afterwards.
- *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
  *
@@ -473,21 +468,37 @@ typedef struct XLogCtlData
    pg_atomic_uint64 logFlushResult;    /* last byte + 1 flushed */
 
    /*
-    * Latest initialized page in the cache (last byte position + 1).
+    * First initialized page in the cache (first byte position).
+    */
+   XLogRecPtr  InitializedFrom;
+
+   /*
+    * Latest reserved for inititalization page in the cache (last byte
+    * position + 1).
     *
-    * To change the identity of a buffer (and InitializedUpTo), you need to
-    * hold WALBufMappingLock.  To change the identity of a buffer that's
+    * To change the identity of a buffer, you need to advance
+    * InitializeReserved first.  To change the identity of a buffer that's
     * still dirty, the old page needs to be written out first, and for that
     * you need WALWriteLock, and you need to ensure that there are no
     * in-progress insertions to the page by calling
     * WaitXLogInsertionsToFinish().
     */
-   XLogRecPtr  InitializedUpTo;
+   pg_atomic_uint64 InitializeReserved;
+
+   /*
+    * Latest initialized page in the cache (last byte position + 1).
+    *
+    * InitializedUpTo is updated after the buffer initialization.  After
+    * update, waiters got notification using InitializedUpToCondVar.
+    */
+   pg_atomic_uint64 InitializedUpTo;
+   ConditionVariable InitializedUpToCondVar;
 
    /*
     * These values do not change after startup, although the pointed-to pages
-    * and xlblocks values certainly do.  xlblocks values are protected by
-    * WALBufMappingLock.
+    * and xlblocks values certainly do.  xlblocks values are changed
+    * lock-free according to the check for the xlog write position and are
+    * accompanied by changes of InitializeReserved and InitializedUpTo.
     */
    char       *pages;          /* buffers for unwritten XLOG pages */
    pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +821,9 @@ XLogInsertRecord(XLogRecData *rdata,
     * fullPageWrites from changing until the insertion is finished.
     *
     * Step 2 can usually be done completely in parallel. If the required WAL
-    * page is not initialized yet, you have to grab WALBufMappingLock to
-    * initialize it, but the WAL writer tries to do that ahead of insertions
-    * to avoid that from happening in the critical path.
+    * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
+    * which will ensure it is initialized. But the WAL writer tries to do that
+    * ahead of insertions to avoid that from happening in the critical path.
     *
     *----------
     */
@@ -1991,32 +2002,79 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
    XLogRecPtr  NewPageEndPtr = InvalidXLogRecPtr;
    XLogRecPtr  NewPageBeginPtr;
    XLogPageHeader NewPage;
+   XLogRecPtr  ReservedPtr;
    int         npages pg_attribute_unused() = 0;
 
-   LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-
    /*
-    * Now that we have the lock, check if someone initialized the page
-    * already.
+    * We must run the loop below inside the critical section as we expect
+    * XLogCtl->InitializedUpTo to eventually keep up.  The most of callers
+    * already run inside the critical section. Except for WAL writer, which
+    * passed 'opportunistic == true', and therefore we don't perform
+    * operations that could error out.
+    *
+    * Start an explicit critical section anyway though.
+    */
+   Assert(CritSectionCount > 0 || opportunistic);
+   START_CRIT_SECTION();
+
+   /*--
+    * Loop till we get all the pages in WAL buffer before 'upto' reserved for
+    * initialization.  Multiple process can initialize different buffers with
+    * this loop in parallel as following.
+    *
+    * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
+    * 2. Initialize the reserved page.
+    * 3. Attempt to advance XLogCtl->InitializedUpTo,
     */
-   while (upto >= XLogCtl->InitializedUpTo || opportunistic)
+   ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
+   while (upto >= ReservedPtr || opportunistic)
    {
-       nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
+       Assert(ReservedPtr % XLOG_BLCKSZ == 0);
 
        /*
-        * Get ending-offset of the buffer page we need to replace (this may
-        * be zero if the buffer hasn't been used yet).  Fall through if it's
-        * already written out.
+        * Get ending-offset of the buffer page we need to replace.
+        *
+        * We don't lookup into xlblocks, but rather calculate position we
+        * must wait to be written. If it was written, xlblocks will have this
+        * position (or uninitialized)
         */
-       OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
-       if (LogwrtResult.Write < OldPageRqstPtr)
+       if (ReservedPtr + XLOG_BLCKSZ > XLogCtl->InitializedFrom + XLOG_BLCKSZ * XLOGbuffers)
+           OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers;
+       else
+           OldPageRqstPtr = InvalidXLogRecPtr;
+
+       if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
        {
            /*
-            * Nope, got work to do. If we just want to pre-initialize as much
-            * as we can without flushing, give up now.
+            * If we just want to pre-initialize as much as we can without
+            * flushing, give up now.
             */
-           if (opportunistic)
-               break;
+           upto = ReservedPtr - 1;
+           break;
+       }
+
+       /*
+        * Attempt to reserve the page for initialization.  Failure means that
+        * this page got reserved by another process.
+        */
+       if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
+                                           &ReservedPtr,
+                                           ReservedPtr + XLOG_BLCKSZ))
+           continue;
+
+       /*
+        * Wait till page gets correctly initialized up to OldPageRqstPtr.
+        */
+       nextidx = XLogRecPtrToBufIdx(ReservedPtr);
+       while (pg_atomic_read_u64(&XLogCtl->InitializedUpTo) < OldPageRqstPtr)
+           ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
+       ConditionVariableCancelSleep();
+       Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageRqstPtr);
+
+       /* Fall through if it's already written out. */
+       if (LogwrtResult.Write < OldPageRqstPtr)
+       {
+           /* Nope, got work to do. */
 
            /* Advance shared memory write request position */
            SpinLockAcquire(&XLogCtl->info_lck);
@@ -2031,14 +2089,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
            RefreshXLogWriteResult(LogwrtResult);
            if (LogwrtResult.Write < OldPageRqstPtr)
            {
-               /*
-                * Must acquire write lock. Release WALBufMappingLock first,
-                * to make sure that all insertions that we need to wait for
-                * can finish (up to this same position). Otherwise we risk
-                * deadlock.
-                */
-               LWLockRelease(WALBufMappingLock);
-
                WaitXLogInsertionsToFinish(OldPageRqstPtr);
 
                LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@@ -2060,9 +2110,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
                    pgWalUsage.wal_buffers_full++;
                    TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
                }
-               /* Re-acquire WALBufMappingLock and retry */
-               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-               continue;
            }
        }
 
@@ -2070,11 +2117,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
         * Now the next buffer slot is free and we can set it up to be the
         * next output page.
         */
-       NewPageBeginPtr = XLogCtl->InitializedUpTo;
+       NewPageBeginPtr = ReservedPtr;
        NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
 
-       Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
-
        NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
        /*
@@ -2138,12 +2183,100 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
         */
        pg_write_barrier();
 
+       /*-----
+        * Update the value of XLogCtl->xlblocks[nextidx] and try to advance
+        * XLogCtl->InitializedUpTo in a lock-less manner.
+        *
+        * First, let's provide a formal proof of the algorithm.  Let it be 'n'
+        * process with the following variables in shared memory:
+        *  f - an array of 'n' boolean flags,
+        *  v - atomic integer variable.
+        *
+        * Also, let
+        *  i - a number of a process,
+        *  j - local integer variable,
+        * CAS(var, oldval, newval) - compare-and-swap atomic operation
+        *                            returning true on success,
+        * write_barrier()/read_barrier() - memory barriers.
+        *
+        * The pseudocode for each process is the following.
+        *
+        *  j := i
+        *  f[i] := true
+        *  write_barrier()
+        *  while CAS(v, j, j + 1):
+        *      j := j + 1
+        *      read_barrier()
+        *      if not f[j]:
+        *          break
+        *
+        * Let's prove that v eventually reaches the value of n.
+        * 1. Prove by contradiction.  Assume v doesn't reach n and stucks
+        *    on k, where k < n.
+        * 2. Process k attempts CAS(v, k, k + 1).  1). If, as we assumed, v
+        *    gets stuck at k, then this CAS operation must fail.  Therefore,
+        *    v < k when process k attempts CAS(v, k, k + 1).
+        * 3. If, as we assumed, v gets stuck at k, then the value k of v
+        *    must be achieved by some process m, where m < k.  The process
+        *    m must observe f[k] == false.  Otherwise, it will later attempt
+        *    CAS(v, k, k + 1) with success.
+        * 4. Therefore, corresponding read_barrier() (while j == k) on
+        *    process m happend before write_barrier() of process k.  But then
+        *    process k attempts CAS(v, k, k + 1) after process m successfully
+        *    incremented v to k, and that CAS operation must succeed.
+        *    That leads to a contradiction.  So, there is no such k (k < n)
+        *    where v gets stuck.  Q.E.D.
+        *
+        * To apply this proof to the code below, we assume
+        * XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ
+        * granularity.  We also assume setting XLogCtl->xlblocks[nextidx] to
+        * NewPageEndPtr to play the role of setting f[i] to true.  Also, note
+        * that processes can't concurrently map different xlog locations to
+        * the same nextidx because we previously requested that
+        * XLogCtl->InitializedUpTo >= OldPageRqstPtr.  So, a xlog buffer can
+        * be taken for initialization only once the previous initialization
+        * takes effect on XLogCtl->InitializedUpTo.
+        */
+
        pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
-       XLogCtl->InitializedUpTo = NewPageEndPtr;
+
+       pg_write_barrier();
+
+       while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
+       {
+           NewPageBeginPtr = NewPageEndPtr;
+           NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
+           nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
+
+           pg_read_barrier();
+
+           if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
+           {
+               /*
+                * Page at nextidx wasn't initialized yet, so we cann't move
+                * InitializedUpto further. It will be moved by backend which
+                * will initialize nextidx.
+                */
+               ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
+               break;
+           }
+       }
 
        npages++;
    }
-   LWLockRelease(WALBufMappingLock);
+
+   END_CRIT_SECTION();
+
+   /*
+    * All the pages in WAL buffer before 'upto' were reserved for
+    * initialization.  However, some pages might be reserved by concurrent
+    * processes.  Wait till they finish initialization.
+    */
+   while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
+       ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
+   ConditionVariableCancelSleep();
+
+   pg_read_barrier();
 
 #ifdef WAL_DEBUG
    if (XLOG_DEBUG && npages > 0)
@@ -5071,6 +5204,10 @@ XLOGShmemInit(void)
    pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
    pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
    pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
+
+   pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
+   pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
+   ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
 }
 
 /*
@@ -6090,7 +6227,8 @@ StartupXLOG(void)
        memset(page + len, 0, XLOG_BLCKSZ - len);
 
        pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
-       XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
+       pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
+       XLogCtl->InitializedFrom = endOfRecoveryInfo->lastPageBeginPtr;
    }
    else
    {
@@ -6099,8 +6237,10 @@ StartupXLOG(void)
         * let the first attempt to insert a log record to initialize the next
         * buffer.
         */
-       XLogCtl->InitializedUpTo = EndOfLog;
+       pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
+       XLogCtl->InitializedFrom = EndOfLog;
    }
+   pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
 
    /*
     * Update local and shared status.  This is OK to do without any locks
index 4f44648aca8176976e1d746021a700768deb0d07..8bce14c38fdb098db5b301281dda5b42ceff24b0 100644 (file)
@@ -156,6 +156,7 @@ REPLICATION_SLOT_DROP   "Waiting for a replication slot to become inactive so it c
 RESTORE_COMMAND    "Waiting for <xref linkend="guc-restore-command"/> to complete."
 SAFE_SNAPSHOT  "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
 SYNC_REP   "Waiting for confirmation from a remote server during synchronous replication."
+WAL_BUFFER_INIT    "Waiting on WAL buffer to be initialized."
 WAL_RECEIVER_EXIT  "Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START    "Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY  "Waiting for a new WAL summary to be generated."
@@ -314,7 +315,6 @@ XidGen  "Waiting to allocate a new transaction ID."
 ProcArray  "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)."
 SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue."
 SInvalWrite    "Waiting to add a message to the shared catalog invalidation queue."
-WALBufMapping  "Waiting to replace a page in WAL buffers."
 WALWrite   "Waiting for WAL buffers to be written to disk."
 ControlFile    "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
 MultiXactGen   "Waiting to read or update shared multixact state."
index 932024b1b0ba5f1c10f5006aa7b999631b88aa53..a9681738146e1d83e5501fbb752513791464c427 100644 (file)
@@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen)
 PG_LWLOCK(4, ProcArray)
 PG_LWLOCK(5, SInvalRead)
 PG_LWLOCK(6, SInvalWrite)
-PG_LWLOCK(7, WALBufMapping)
+/* 7 was WALBufMapping */
 PG_LWLOCK(8, WALWrite)
 PG_LWLOCK(9, ControlFile)
 /* 10 was CheckpointLock */