Fix CREATE INDEX CONCURRENTLY for the newest prepared transactions.
authorNoah Misch <noah@leadboat.com>
Sun, 24 Oct 2021 01:36:38 +0000 (18:36 -0700)
committerNoah Misch <noah@leadboat.com>
Sun, 24 Oct 2021 01:36:38 +0000 (18:36 -0700)
The purpose of commit 8a54e12a38d1545d249f1402f66c8cde2837d97c was to
fix this, and it sufficed when the PREPARE TRANSACTION completed before
the CIC looked for lock conflicts.  Otherwise, things still broke.  As
before, in a cluster having used CIC while having enabled prepared
transactions, queries that use the resulting index can silently fail to
find rows.  It may be necessary to reindex to recover from past
occurrences; REINDEX CONCURRENTLY suffices.  Fix this for future index
builds by making CIC wait for arbitrarily-recent prepared transactions
and for ordinary transactions that may yet PREPARE TRANSACTION.  As part
of that, have PREPARE TRANSACTION transfer locks to its dummy PGPROC
before it calls ProcArrayClearTransaction().  Back-patch to 9.6 (all
supported versions).

Andrey Borodin, reviewed (in earlier versions) by Andres Freund.

Discussion: https://postgr.es/m/01824242-AA92-4FE9-9BA7-AEBAFFEA3D0C@yandex-team.ru

contrib/amcheck/t/003_cic_2pc.pl [new file with mode: 0644]
src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c
src/backend/storage/lmgr/lmgr.c
src/backend/storage/lmgr/lock.c
src/backend/utils/cache/inval.c
src/include/access/twophase.h
src/include/storage/lock.h

diff --git a/contrib/amcheck/t/003_cic_2pc.pl b/contrib/amcheck/t/003_cic_2pc.pl
new file mode 100644 (file)
index 0000000..803b99a
--- /dev/null
@@ -0,0 +1,188 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+
+use Test::More tests => 6;
+
+my ($node, $result);
+
+#
+# Test set-up
+#
+$node = PostgresNode->new('CIC_2PC_test');
+$node->init;
+$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10');
+$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
+
+
+#
+# Run 3 overlapping 2PC transactions with CIC
+#
+# We have two concurrent background psql processes: $main_h for INSERTs and
+# $cic_h for CIC.  Also, we use non-background psql for some COMMIT PREPARED
+# statements.
+#
+
+my $main_in    = '';
+my $main_out   = '';
+my $main_timer = IPC::Run::timeout(180);
+
+my $main_h =
+  $node->background_psql('postgres', \$main_in, \$main_out,
+   $main_timer, on_error_stop => 1);
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint1
+);
+pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
+
+my $cic_in    = '';
+my $cic_out   = '';
+my $cic_timer = IPC::Run::timeout(180);
+my $cic_h =
+  $node->background_psql('postgres', \$cic_in, \$cic_out,
+   $cic_timer, on_error_stop => 1);
+$cic_in .= q(
+\echo start
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
+
+$main_in .= q(
+PREPARE TRANSACTION 'a';
+);
+
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint2
+);
+pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'b';
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint3
+);
+pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'c';
+COMMIT PREPARED 'c';
+);
+$main_h->pump_nb;
+
+$main_h->finish;
+$cic_h->finish;
+
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after overlapping 2PC');
+
+
+#
+# Server restart shall not change whether prepared xact blocks CIC
+#
+
+$node->safe_psql(
+   'postgres', q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+PREPARE TRANSACTION 'spans_restart';
+BEGIN;
+CREATE TABLE unused ();
+PREPARE TRANSACTION 'persists_forever';
+));
+$node->restart;
+
+my $reindex_in    = '';
+my $reindex_out   = '';
+my $reindex_timer = IPC::Run::timeout(180);
+my $reindex_h =
+  $node->background_psql('postgres', \$reindex_in, \$reindex_out,
+   $reindex_timer, on_error_stop => 1);
+$reindex_in .= q(
+\echo start
+DROP INDEX CONCURRENTLY idx;
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
+
+$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
+$reindex_h->finish;
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after 2PC and restart');
+
+
+#
+# Stress CIC+2PC with pgbench
+#
+
+# Fix broken index first
+$node->safe_psql('postgres', q(REINDEX TABLE tbl;));
+
+# Run background pgbench with CIC. We cannot mix-in this script into single
+# pgbench: CIC will deadlock with itself occasionally.
+my $pgbench_out   = '';
+my $pgbench_timer = IPC::Run::timeout(180);
+my $pgbench_h     = $node->background_pgbench(
+   '--no-vacuum --client=1 --transactions=100',
+   {
+       '002_pgbench_concurrent_cic' => q(
+           DROP INDEX CONCURRENTLY idx;
+           CREATE INDEX CONCURRENTLY idx ON tbl(i);
+           SELECT bt_index_check('idx',true);
+          )
+   },
+   \$pgbench_out,
+   $pgbench_timer);
+
+# Run pgbench.
+$node->pgbench(
+   '--no-vacuum --client=5 --transactions=100',
+   0,
+   [qr{actually processed}],
+   [qr{^$}],
+   'concurrent INSERTs w/ 2PC',
+   {
+       '002_pgbench_concurrent_2pc' => q(
+           BEGIN;
+           INSERT INTO tbl VALUES(0);
+           PREPARE TRANSACTION 'c:client_id';
+           COMMIT PREPARED 'c:client_id';
+         ),
+       '002_pgbench_concurrent_2pc_savepoint' => q(
+           BEGIN;
+           SAVEPOINT s1;
+           INSERT INTO tbl VALUES(0);
+           PREPARE TRANSACTION 'c:client_id';
+           COMMIT PREPARED 'c:client_id';
+         )
+   });
+
+$pgbench_h->pump_nb;
+$pgbench_h->finish();
+$result =
+    ($Config{osname} eq "MSWin32")
+  ? ($pgbench_h->full_results)[0]
+  : $pgbench_h->result(0);
+is($result, 0, "pgbench with CIC works");
+
+# done
+$node->stop;
+done_testing();
index 2156de187c37fc99804078d8431405b0c0861e86..f6e7fa71d8887554febe5afe7f419e39a027838b 100644 (file)
@@ -459,14 +459,24 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
    proc->pgprocno = gxact->pgprocno;
    SHMQueueElemInit(&(proc->links));
    proc->waitStatus = PROC_WAIT_STATUS_OK;
-   /* We set up the gxact's VXID as InvalidBackendId/XID */
-   proc->lxid = (LocalTransactionId) xid;
+   if (LocalTransactionIdIsValid(MyProc->lxid))
+   {
+       /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
+       proc->lxid = MyProc->lxid;
+       proc->backendId = MyBackendId;
+   }
+   else
+   {
+       Assert(AmStartupProcess() || !IsPostmasterEnvironment);
+       /* GetLockConflicts() uses this to specify a wait on the XID */
+       proc->lxid = xid;
+       proc->backendId = InvalidBackendId;
+   }
    proc->xid = xid;
    Assert(proc->xmin == InvalidTransactionId);
    proc->delayChkpt = false;
    proc->statusFlags = 0;
    proc->pid = 0;
-   proc->backendId = InvalidBackendId;
    proc->databaseId = databaseid;
    proc->roleId = owner;
    proc->tempNamespaceId = InvalidOid;
@@ -846,6 +856,53 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
    return result;
 }
 
+/*
+ * TwoPhaseGetXidByVirtualXID
+ *     Lookup VXID among xacts prepared since last startup.
+ *
+ * (This won't find recovered xacts.)  If more than one matches, return any
+ * and set "have_more" to true.  To witness multiple matches, a single
+ * BackendId must consume 2^32 LXIDs, with no intervening database restart.
+ */
+TransactionId
+TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+                          bool *have_more)
+{
+   int         i;
+   TransactionId result = InvalidTransactionId;
+
+   Assert(VirtualTransactionIdIsValid(vxid));
+   LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+   for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+   {
+       GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+       PGPROC     *proc;
+       VirtualTransactionId proc_vxid;
+
+       if (!gxact->valid)
+           continue;
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       GET_VXID_FROM_PGPROC(proc_vxid, *proc);
+       if (VirtualTransactionIdEquals(vxid, proc_vxid))
+       {
+           /* Startup process sets proc->backendId to InvalidBackendId. */
+           Assert(!gxact->inredo);
+
+           if (result != InvalidTransactionId)
+           {
+               *have_more = true;
+               break;
+           }
+           result = gxact->xid;
+       }
+   }
+
+   LWLockRelease(TwoPhaseStateLock);
+
+   return result;
+}
+
 /*
  * TwoPhaseGetDummyBackendId
  *     Get the dummy backend ID for prepared transaction specified by XID
index d96881c0de553f86ece6e458564bc3cb62830a03..ca6f6d57d3663bf2d7c04a52e51a42da3c1b3909 100644 (file)
@@ -2506,6 +2506,13 @@ PrepareTransaction(void)
    /* Reset XactLastRecEnd until the next transaction writes something */
    XactLastRecEnd = 0;
 
+   /*
+    * Transfer our locks to a dummy PGPROC.  This has to be done before
+    * ProcArrayClearTransaction().  Otherwise, a GetLockConflicts() would
+    * conclude "xact already committed or aborted" for our locks.
+    */
+   PostPrepare_Locks(xid);
+
    /*
     * Let others know about no transaction in progress by me.  This has to be
     * done *after* the prepared transaction has been marked valid, else
@@ -2545,7 +2552,6 @@ PrepareTransaction(void)
 
    PostPrepare_MultiXact(xid);
 
-   PostPrepare_Locks(xid);
    PostPrepare_PredicateLocks(xid);
 
    ResourceOwnerRelease(TopTransactionResourceOwner,
index cdf2266d6d53ee4e62b1b0a6e87db6bdd424eb77..2db0424ad94c917613b50695a36599987c2e4cab 100644 (file)
@@ -871,9 +871,10 @@ XactLockTableWaitErrorCb(void *arg)
  * To do this, obtain the current list of lockers, and wait on their VXIDs
  * until they are finished.
  *
- * Note we don't try to acquire the locks on the given locktags, only the VXIDs
- * of its lock holders; if somebody grabs a conflicting lock on the objects
- * after we obtained our initial list of lockers, we will not wait for them.
+ * Note we don't try to acquire the locks on the given locktags, only the
+ * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
+ * on the objects after we obtained our initial list of lockers, we will not
+ * wait for them.
  */
 void
 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
index 364654e10603598ee96feb756477f4c9b7adfa7c..c25af7fe090822fd9302218807a054f65ff1a412 100644 (file)
@@ -2900,8 +2900,12 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
  * The result array is palloc'd and is terminated with an invalid VXID.
  * *countp, if not null, is updated to the number of items set.
  *
- * Of course, the result could be out of date by the time it's returned,
- * so use of this function has to be thought about carefully.
+ * Of course, the result could be out of date by the time it's returned, so
+ * use of this function has to be thought about carefully.  Similarly, a
+ * PGPROC with no "lxid" will be considered non-conflicting regardless of any
+ * lock it holds.  Existing callers don't care about a locker after that
+ * locker's pg_xact updates complete.  CommitTransaction() clears "lxid" after
+ * pg_xact updates and before releasing locks.
  *
  * Note we never include the current xact's vxid in the result array,
  * since an xact never blocks itself.
@@ -4529,37 +4533,80 @@ VirtualXactLockTableCleanup(void)
    }
 }
 
+/*
+ *     XactLockForVirtualXact
+ *
+ * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
+ * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid).  Unlike those
+ * functions, it assumes "xid" is never a subtransaction and that "xid" is
+ * prepared, committed, or aborted.
+ *
+ * If !TransactionIdIsValid(xid), this locks every prepared XID having been
+ * known as "vxid" before its PREPARE TRANSACTION.
+ */
+static bool
+XactLockForVirtualXact(VirtualTransactionId vxid,
+                      TransactionId xid, bool wait)
+{
+   bool        more = false;
+
+   /* There is no point to wait for 2PCs if you have no 2PCs. */
+   if (max_prepared_xacts == 0)
+       return true;
+
+   do
+   {
+       LockAcquireResult lar;
+       LOCKTAG     tag;
+
+       /* Clear state from previous iterations. */
+       if (more)
+       {
+           xid = InvalidTransactionId;
+           more = false;
+       }
+
+       /* If we have no xid, try to find one. */
+       if (!TransactionIdIsValid(xid))
+           xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
+       if (!TransactionIdIsValid(xid))
+       {
+           Assert(!more);
+           return true;
+       }
+
+       /* Check or wait for XID completion. */
+       SET_LOCKTAG_TRANSACTION(tag, xid);
+       lar = LockAcquire(&tag, ShareLock, false, !wait);
+       if (lar == LOCKACQUIRE_NOT_AVAIL)
+           return false;
+       LockRelease(&tag, ShareLock, false);
+   } while (more);
+
+   return true;
+}
+
 /*
  *     VirtualXactLock
  *
- * If wait = true, wait until the given VXID has been released, and then
- * return true.
+ * If wait = true, wait as long as the given VXID or any XID acquired by the
+ * same transaction is still running.  Then, return true.
  *
- * If wait = false, just check whether the VXID is still running, and return
- * true or false.
+ * If wait = false, just check whether that VXID or one of those XIDs is still
+ * running, and return true or false.
  */
 bool
 VirtualXactLock(VirtualTransactionId vxid, bool wait)
 {
    LOCKTAG     tag;
    PGPROC     *proc;
+   TransactionId xid = InvalidTransactionId;
 
    Assert(VirtualTransactionIdIsValid(vxid));
 
-   if (VirtualTransactionIdIsPreparedXact(vxid))
-   {
-       LockAcquireResult lar;
-
-       /*
-        * Prepared transactions don't hold vxid locks.  The
-        * LocalTransactionId is always a normal, locked XID.
-        */
-       SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId);
-       lar = LockAcquire(&tag, ShareLock, false, !wait);
-       if (lar != LOCKACQUIRE_NOT_AVAIL)
-           LockRelease(&tag, ShareLock, false);
-       return lar != LOCKACQUIRE_NOT_AVAIL;
-   }
+   if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
+       /* no vxid lock; localTransactionId is a normal, locked XID */
+       return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
 
    SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
 
@@ -4573,7 +4620,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
     */
    proc = BackendIdGetProc(vxid.backendId);
    if (proc == NULL)
-       return true;
+       return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
 
    /*
     * We must acquire this lock before checking the backendId and lxid
@@ -4582,12 +4629,12 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
     */
    LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE);
 
-   /* If the transaction has ended, our work here is done. */
    if (proc->backendId != vxid.backendId
        || proc->fpLocalTransactionId != vxid.localTransactionId)
    {
+       /* VXID ended */
        LWLockRelease(&proc->fpInfoLock);
-       return true;
+       return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
    }
 
    /*
@@ -4634,6 +4681,16 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
        proc->fpVXIDLock = false;
    }
 
+   /*
+    * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
+    * search.  The proc might have assigned this XID but not yet locked it,
+    * in which case the proc will lock this XID before releasing the VXID.
+    * The fpInfoLock critical section excludes VirtualXactLockTableCleanup(),
+    * so we won't save an XID of a different VXID.  It doesn't matter whether
+    * we save this before or after setting up the primary lock table entry.
+    */
+   xid = proc->xid;
+
    /* Done with proc->fpLockBits */
    LWLockRelease(&proc->fpInfoLock);
 
@@ -4641,7 +4698,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
    (void) LockAcquire(&tag, ShareLock, false, false);
 
    LockRelease(&tag, ShareLock, false);
-   return true;
+   return XactLockForVirtualXact(vxid, xid, wait);
 }
 
 /*
index dd8586ab4d9169e33ad5d9abcd6103acdbfa5283..777fab49151ec170e625fe8dd51961d2bb557f85 100644 (file)
  * (XXX is it worth testing likewise for duplicate catcache flush entries?
  * Probably not.)
  *
+ * Many subsystems own higher-level caches that depend on relcache and/or
+ * catcache, and they register callbacks here to invalidate their caches.
+ * While building a higher-level cache entry, a backend may receive a
+ * callback for the being-built entry or one of its dependencies.  This
+ * implies the new higher-level entry would be born stale, and it might
+ * remain stale for the life of the backend.  Many caches do not prevent
+ * that.  They rely on DDL for can't-miss catalog changes taking
+ * AccessExclusiveLock on suitable objects.  (For a change made with less
+ * locking, backends might never read the change.)  The relation cache,
+ * however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later
+ * than the beginning of the next transaction.  Hence, when a relevant
+ * invalidation callback arrives during a build, relcache.c reattempts that
+ * build.  Caches with similar needs could do likewise.
+ *
  * If a relcache flush is issued for a system relation that we preload
  * from the relcache init file, we must also delete the init file so that
  * it will be rebuilt during the next backend restart.  The actual work of
index e27e1a8fe8a0d47428b27b5a532f0fffa37dac64..2d758422a79aea14cc147635c90975795708c462 100644 (file)
@@ -34,6 +34,8 @@ extern void TwoPhaseShmemInit(void);
 extern void AtAbort_Twophase(void);
 extern void PostPrepare_Twophase(void);
 
+extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+                                               bool *have_more);
 extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held);
 extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held);
 
index 9b2a421c32c25e8a69a84194a337d943cd2ffd57..a5286fab893bd62dacb2a1b6eaecb83f8913ebc1 100644 (file)
@@ -47,10 +47,11 @@ extern bool Debug_deadlocks;
 
 /*
  * Top-level transactions are identified by VirtualTransactionIDs comprising
- * PGPROC fields backendId and lxid.  For prepared transactions, the
- * LocalTransactionId is an ordinary XID.  These are guaranteed unique over
- * the short term, but will be reused after a database restart or XID
- * wraparound; hence they should never be stored on disk.
+ * PGPROC fields backendId and lxid.  For recovered prepared transactions, the
+ * LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never
+ * refers to that kind.  These are guaranteed unique over the short term, but
+ * will be reused after a database restart or XID wraparound; hence they
+ * should never be stored on disk.
  *
  * Note that struct VirtualTransactionId can not be assumed to be atomically
  * assignable as a whole.  However, type LocalTransactionId is assumed to
@@ -70,7 +71,7 @@ typedef struct
 #define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId)
 #define VirtualTransactionIdIsValid(vxid) \
    (LocalTransactionIdIsValid((vxid).localTransactionId))
-#define VirtualTransactionIdIsPreparedXact(vxid) \
+#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \
    ((vxid).backendId == InvalidBackendId)
 #define VirtualTransactionIdEquals(vxid1, vxid2) \
    ((vxid1).backendId == (vxid2).backendId && \