bufmgr: Support multiple in-progress IOs by using resowner
authorAndres Freund <andres@anarazel.de>
Wed, 5 Apr 2023 20:55:15 +0000 (13:55 -0700)
committerAndres Freund <andres@anarazel.de>
Wed, 5 Apr 2023 21:17:55 +0000 (14:17 -0700)
A future patch will add support for extending relations by multiple blocks at
once. To be concurrency safe, the buffers for those blocks need to be marked
as BM_IO_IN_PROGRESS. Until now we only had infrastructure for recovering from
an IO error for a single buffer. This commit extends that infrastructure to
multiple buffers by using the resource owner infrastructure.

This commit increases the size of the ResourceOwnerData struct, which appears
to have a just about measurable overhead in very extreme workloads. Medium
term we are planning to substantially shrink the size of
ResourceOwnerData. Short term the increase is small enough to not worry about
it for now.

Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/20221029025420.eplyow6k7tgu6he3@awork3.anarazel.de
Discussion: https://postgr.es/m/20221029200025.w7bvlgvamjfo6z44@awork3.anarazel.de

src/backend/access/transam/xact.c
src/backend/postmaster/autovacuum.c
src/backend/postmaster/bgwriter.c
src/backend/postmaster/checkpointer.c
src/backend/postmaster/walwriter.c
src/backend/storage/buffer/bufmgr.c
src/backend/utils/resowner/resowner.c
src/include/storage/bufmgr.h
src/include/utils/resowner_private.h

index 231af52cc92f49aad7a1985daa3b4738f27d0d45..6a837e1539d79cd5ceb338539a3bdf956dc754bc 100644 (file)
@@ -2719,8 +2719,7 @@ AbortTransaction(void)
    pgstat_report_wait_end();
    pgstat_progress_end_command();
 
-   /* Clean up buffer I/O and buffer context locks, too */
-   AbortBufferIO();
+   /* Clean up buffer context locks, too */
    UnlockBuffers();
 
    /* Reset WAL record construction state */
@@ -5080,7 +5079,6 @@ AbortSubTransaction(void)
 
    pgstat_report_wait_end();
    pgstat_progress_end_command();
-   AbortBufferIO();
    UnlockBuffers();
 
    /* Reset WAL record construction state */
index 585d28148cabf30f480e1aa2b2e46ab59cbce538..e9ba0dc17cdb982954548bde983baec57ed05571 100644 (file)
@@ -526,7 +526,6 @@ AutoVacLauncherMain(int argc, char *argv[])
         */
        LWLockReleaseAll();
        pgstat_report_wait_end();
-       AbortBufferIO();
        UnlockBuffers();
        /* this is probably dead code, but let's be safe: */
        if (AuxProcessResourceOwner)
index 9bb47da404d6d6c58675c5d9e030b8541640b02f..caad642ec93b9191c8b5b638123c263764e68fb8 100644 (file)
@@ -167,7 +167,6 @@ BackgroundWriterMain(void)
         */
        LWLockReleaseAll();
        ConditionVariableCancelSleep();
-       AbortBufferIO();
        UnlockBuffers();
        ReleaseAuxProcessResources(false);
        AtEOXact_Buffers(false);
index aaad5c522811134ee41d40b1f279ebf5ad923b0c..ace9893d957674ef5a1a251d949b66555057f8a8 100644 (file)
@@ -271,7 +271,6 @@ CheckpointerMain(void)
        LWLockReleaseAll();
        ConditionVariableCancelSleep();
        pgstat_report_wait_end();
-       AbortBufferIO();
        UnlockBuffers();
        ReleaseAuxProcessResources(false);
        AtEOXact_Buffers(false);
index 513e580c513da70bd7e0589580e8c9b84a3d273d..65e84be39b9d2582e3adf6d63357e32e688fab95 100644 (file)
@@ -163,7 +163,6 @@ WalWriterMain(void)
        LWLockReleaseAll();
        ConditionVariableCancelSleep();
        pgstat_report_wait_end();
-       AbortBufferIO();
        UnlockBuffers();
        ReleaseAuxProcessResources(false);
        AtEOXact_Buffers(false);
index e70cf7a651a282e9937139407f59cb5b2602da2d..af073e9a395a0d7b2cf314d461a8a43b33c31a8e 100644 (file)
@@ -159,10 +159,6 @@ int            checkpoint_flush_after = DEFAULT_CHECKPOINT_FLUSH_AFTER;
 int            bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER;
 int            backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
 
-/* local state for StartBufferIO and related functions */
-static BufferDesc *InProgressBuf = NULL;
-static bool IsForInput;
-
 /* local state for LockBufferForCleanup */
 static BufferDesc *PinCountWaitBuf = NULL;
 
@@ -2705,7 +2701,6 @@ InitBufferPoolAccess(void)
 static void
 AtProcExit_Buffers(int code, Datum arg)
 {
-   AbortBufferIO();
    UnlockBuffers();
 
    CheckForBufferLeaks();
@@ -4647,7 +4642,7 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 {
    uint32      buf_state;
 
-   Assert(!InProgressBuf);
+   ResourceOwnerEnlargeBufferIOs(CurrentResourceOwner);
 
    for (;;)
    {
@@ -4671,8 +4666,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
    buf_state |= BM_IO_IN_PROGRESS;
    UnlockBufHdr(buf, buf_state);
 
-   InProgressBuf = buf;
-   IsForInput = forInput;
+   ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+                                 BufferDescriptorGetBuffer(buf));
 
    return true;
 }
@@ -4698,8 +4693,6 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 {
    uint32      buf_state;
 
-   Assert(buf == InProgressBuf);
-
    buf_state = LockBufHdr(buf);
 
    Assert(buf_state & BM_IO_IN_PROGRESS);
@@ -4711,13 +4704,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
    buf_state |= set_flag_bits;
    UnlockBufHdr(buf, buf_state);
 
-   InProgressBuf = NULL;
+   ResourceOwnerForgetBufferIO(CurrentResourceOwner,
+                               BufferDescriptorGetBuffer(buf));
 
    ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
 }
 
 /*
- * AbortBufferIO: Clean up any active buffer I/O after an error.
+ * AbortBufferIO: Clean up active buffer I/O after an error.
  *
  * All LWLocks we might have held have been released,
  * but we haven't yet released buffer pins, so the buffer is still pinned.
@@ -4726,46 +4720,42 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
  * possible the error condition wasn't related to the I/O.
  */
 void
-AbortBufferIO(void)
+AbortBufferIO(Buffer buf)
 {
-   BufferDesc *buf = InProgressBuf;
+   BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
+   uint32      buf_state;
 
-   if (buf)
-   {
-       uint32      buf_state;
+   buf_state = LockBufHdr(buf_hdr);
+   Assert(buf_state & (BM_IO_IN_PROGRESS | BM_TAG_VALID));
 
-       buf_state = LockBufHdr(buf);
-       Assert(buf_state & BM_IO_IN_PROGRESS);
-       if (IsForInput)
-       {
-           Assert(!(buf_state & BM_DIRTY));
+   if (!(buf_state & BM_VALID))
+   {
+       Assert(!(buf_state & BM_DIRTY));
+       UnlockBufHdr(buf_hdr, buf_state);
+   }
+   else
+   {
+       Assert(!(buf_state & BM_DIRTY));
+       UnlockBufHdr(buf_hdr, buf_state);
 
-           /* We'd better not think buffer is valid yet */
-           Assert(!(buf_state & BM_VALID));
-           UnlockBufHdr(buf, buf_state);
-       }
-       else
+       /* Issue notice if this is not the first failure... */
+       if (buf_state & BM_IO_ERROR)
        {
-           Assert(buf_state & BM_DIRTY);
-           UnlockBufHdr(buf, buf_state);
-           /* Issue notice if this is not the first failure... */
-           if (buf_state & BM_IO_ERROR)
-           {
-               /* Buffer is pinned, so we can read tag without spinlock */
-               char       *path;
-
-               path = relpathperm(BufTagGetRelFileLocator(&buf->tag),
-                                  BufTagGetForkNum(&buf->tag));
-               ereport(WARNING,
-                       (errcode(ERRCODE_IO_ERROR),
-                        errmsg("could not write block %u of %s",
-                               buf->tag.blockNum, path),
-                        errdetail("Multiple failures --- write error might be permanent.")));
-               pfree(path);
-           }
+           /* Buffer is pinned, so we can read tag without spinlock */
+           char       *path;
+
+           path = relpathperm(BufTagGetRelFileLocator(&buf_hdr->tag),
+                              BufTagGetForkNum(&buf_hdr->tag));
+           ereport(WARNING,
+                   (errcode(ERRCODE_IO_ERROR),
+                    errmsg("could not write block %u of %s",
+                           buf_hdr->tag.blockNum, path),
+                    errdetail("Multiple failures --- write error might be permanent.")));
+           pfree(path);
        }
-       TerminateBufferIO(buf, false, BM_IO_ERROR);
    }
+
+   TerminateBufferIO(buf_hdr, false, BM_IO_ERROR);
 }
 
 /*
index 19b6241e45d47d1925c6dd5b851c7295590363e6..fccc59b39dd5edcd829584daebbc36747db031ff 100644 (file)
@@ -121,6 +121,7 @@ typedef struct ResourceOwnerData
 
    /* We have built-in support for remembering: */
    ResourceArray bufferarr;    /* owned buffers */
+   ResourceArray bufferioarr;  /* in-progress buffer IO */
    ResourceArray catrefarr;    /* catcache references */
    ResourceArray catlistrefarr;    /* catcache-list pins */
    ResourceArray relrefarr;    /* relcache references */
@@ -441,6 +442,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
    }
 
    ResourceArrayInit(&(owner->bufferarr), BufferGetDatum(InvalidBuffer));
+   ResourceArrayInit(&(owner->bufferioarr), BufferGetDatum(InvalidBuffer));
    ResourceArrayInit(&(owner->catrefarr), PointerGetDatum(NULL));
    ResourceArrayInit(&(owner->catlistrefarr), PointerGetDatum(NULL));
    ResourceArrayInit(&(owner->relrefarr), PointerGetDatum(NULL));
@@ -517,6 +519,24 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
    if (phase == RESOURCE_RELEASE_BEFORE_LOCKS)
    {
+       /*
+        * Abort failed buffer IO. AbortBufferIO()->TerminateBufferIO() calls
+        * ResourceOwnerForgetBufferIOs(), so we just have to iterate till
+        * there are none.
+        *
+        * Needs to be before we release buffer pins.
+        *
+        * During a commit, there shouldn't be any in-progress IO.
+        */
+       while (ResourceArrayGetAny(&(owner->bufferioarr), &foundres))
+       {
+           Buffer      res = DatumGetBuffer(foundres);
+
+           if (isCommit)
+               elog(PANIC, "lost track of buffer IO on buffer %u", res);
+           AbortBufferIO(res);
+       }
+
        /*
         * Release buffer pins.  Note that ReleaseBuffer will remove the
         * buffer entry from our array, so we just have to iterate till there
@@ -746,6 +766,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 
    /* And it better not own any resources, either */
    Assert(owner->bufferarr.nitems == 0);
+   Assert(owner->bufferioarr.nitems == 0);
    Assert(owner->catrefarr.nitems == 0);
    Assert(owner->catlistrefarr.nitems == 0);
    Assert(owner->relrefarr.nitems == 0);
@@ -775,6 +796,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 
    /* And free the object. */
    ResourceArrayFree(&(owner->bufferarr));
+   ResourceArrayFree(&(owner->bufferioarr));
    ResourceArrayFree(&(owner->catrefarr));
    ResourceArrayFree(&(owner->catlistrefarr));
    ResourceArrayFree(&(owner->relrefarr));
@@ -976,6 +998,44 @@ ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
             buffer, owner->name);
 }
 
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * buffer array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeBufferIOs(ResourceOwner owner)
+{
+   /* We used to allow pinning buffers without a resowner, but no more */
+   Assert(owner != NULL);
+   ResourceArrayEnlarge(&(owner->bufferioarr));
+}
+
+/*
+ * Remember that a buffer IO is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeBufferIOs()
+ */
+void
+ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer)
+{
+   ResourceArrayAdd(&(owner->bufferioarr), BufferGetDatum(buffer));
+}
+
+/*
+ * Forget that a buffer IO is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
+{
+   if (!ResourceArrayRemove(&(owner->bufferioarr), BufferGetDatum(buffer)))
+       elog(PANIC, "buffer IO %d is not owned by resource owner %s",
+            buffer, owner->name);
+}
+
 /*
  * Remember that a Local Lock is owned by a ResourceOwner
  *
index f96dc08021146dabb24d54b3ac1500795a144df3..537b89e8774bffab7bdee9b700806e5a04f79c4f 100644 (file)
@@ -181,7 +181,7 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
 
-extern void AbortBufferIO(void);
+extern void AbortBufferIO(Buffer buffer);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
index 1b1f3181b5459a869b0bf20fcda904c898385266..ae58438ec767d7f1cdd4c14ea546031e6778b301 100644 (file)
@@ -30,6 +30,11 @@ extern void ResourceOwnerEnlargeBuffers(ResourceOwner owner);
 extern void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer);
 extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
 
+/* support for IO-in-progress management */
+extern void ResourceOwnerEnlargeBufferIOs(ResourceOwner owner);
+extern void ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer);
+extern void ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer);
+
 /* support for local lock management */
 extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
 extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);