--- /dev/null
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+
+use Test::More tests => 6;
+
+my ($node, $result);
+
+#
+# Test set-up
+#
+$node = PostgresNode->new('CIC_2PC_test');
+$node->init;
+$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10');
+$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
+
+
+#
+# Run 3 overlapping 2PC transactions with CIC
+#
+# We have two concurrent background psql processes: $main_h for INSERTs and
+# $cic_h for CIC. Also, we use non-background psql for some COMMIT PREPARED
+# statements.
+#
+
+my $main_in = '';
+my $main_out = '';
+my $main_timer = IPC::Run::timeout(180);
+
+my $main_h =
+ $node->background_psql('postgres', \$main_in, \$main_out,
+ $main_timer, on_error_stop => 1);
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint1
+);
+pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
+
+my $cic_in = '';
+my $cic_out = '';
+my $cic_timer = IPC::Run::timeout(180);
+my $cic_h =
+ $node->background_psql('postgres', \$cic_in, \$cic_out,
+ $cic_timer, on_error_stop => 1);
+$cic_in .= q(
+\echo start
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
+
+$main_in .= q(
+PREPARE TRANSACTION 'a';
+);
+
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint2
+);
+pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'b';
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint3
+);
+pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'c';
+COMMIT PREPARED 'c';
+);
+$main_h->pump_nb;
+
+$main_h->finish;
+$cic_h->finish;
+
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after overlapping 2PC');
+
+
+#
+# Server restart shall not change whether prepared xact blocks CIC
+#
+
+$node->safe_psql(
+ 'postgres', q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+PREPARE TRANSACTION 'spans_restart';
+BEGIN;
+CREATE TABLE unused ();
+PREPARE TRANSACTION 'persists_forever';
+));
+$node->restart;
+
+my $reindex_in = '';
+my $reindex_out = '';
+my $reindex_timer = IPC::Run::timeout(180);
+my $reindex_h =
+ $node->background_psql('postgres', \$reindex_in, \$reindex_out,
+ $reindex_timer, on_error_stop => 1);
+$reindex_in .= q(
+\echo start
+DROP INDEX CONCURRENTLY idx;
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
+
+$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
+$reindex_h->finish;
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after 2PC and restart');
+
+
+#
+# Stress CIC+2PC with pgbench
+#
+
+# Fix broken index first
+$node->safe_psql('postgres', q(REINDEX TABLE tbl;));
+
+# Run background pgbench with CIC. We cannot mix-in this script into single
+# pgbench: CIC will deadlock with itself occasionally.
+my $pgbench_out = '';
+my $pgbench_timer = IPC::Run::timeout(180);
+my $pgbench_h = $node->background_pgbench(
+ '--no-vacuum --client=1 --transactions=100',
+ {
+ '002_pgbench_concurrent_cic' => q(
+ DROP INDEX CONCURRENTLY idx;
+ CREATE INDEX CONCURRENTLY idx ON tbl(i);
+ SELECT bt_index_check('idx',true);
+ )
+ },
+ \$pgbench_out,
+ $pgbench_timer);
+
+# Run pgbench.
+$node->pgbench(
+ '--no-vacuum --client=5 --transactions=100',
+ 0,
+ [qr{actually processed}],
+ [qr{^$}],
+ 'concurrent INSERTs w/ 2PC',
+ {
+ '002_pgbench_concurrent_2pc' => q(
+ BEGIN;
+ INSERT INTO tbl VALUES(0);
+ PREPARE TRANSACTION 'c:client_id';
+ COMMIT PREPARED 'c:client_id';
+ ),
+ '002_pgbench_concurrent_2pc_savepoint' => q(
+ BEGIN;
+ SAVEPOINT s1;
+ INSERT INTO tbl VALUES(0);
+ PREPARE TRANSACTION 'c:client_id';
+ COMMIT PREPARED 'c:client_id';
+ )
+ });
+
+$pgbench_h->pump_nb;
+$pgbench_h->finish();
+$result =
+ ($Config{osname} eq "MSWin32")
+ ? ($pgbench_h->full_results)[0]
+ : $pgbench_h->result(0);
+is($result, 0, "pgbench with CIC works");
+
+# done
+$node->stop;
+done_testing();
proc->pgprocno = gxact->pgprocno;
SHMQueueElemInit(&(proc->links));
proc->waitStatus = PROC_WAIT_STATUS_OK;
- /* We set up the gxact's VXID as InvalidBackendId/XID */
- proc->lxid = (LocalTransactionId) xid;
+ if (LocalTransactionIdIsValid(MyProc->lxid))
+ {
+ /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
+ proc->lxid = MyProc->lxid;
+ proc->backendId = MyBackendId;
+ }
+ else
+ {
+ Assert(AmStartupProcess() || !IsPostmasterEnvironment);
+ /* GetLockConflicts() uses this to specify a wait on the XID */
+ proc->lxid = xid;
+ proc->backendId = InvalidBackendId;
+ }
proc->xid = xid;
Assert(proc->xmin == InvalidTransactionId);
proc->delayChkpt = false;
proc->statusFlags = 0;
proc->pid = 0;
- proc->backendId = InvalidBackendId;
proc->databaseId = databaseid;
proc->roleId = owner;
proc->tempNamespaceId = InvalidOid;
return result;
}
+/*
+ * TwoPhaseGetXidByVirtualXID
+ * Lookup VXID among xacts prepared since last startup.
+ *
+ * (This won't find recovered xacts.) If more than one matches, return any
+ * and set "have_more" to true. To witness multiple matches, a single
+ * BackendId must consume 2^32 LXIDs, with no intervening database restart.
+ */
+TransactionId
+TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+ bool *have_more)
+{
+ int i;
+ TransactionId result = InvalidTransactionId;
+
+ Assert(VirtualTransactionIdIsValid(vxid));
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC *proc;
+ VirtualTransactionId proc_vxid;
+
+ if (!gxact->valid)
+ continue;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ GET_VXID_FROM_PGPROC(proc_vxid, *proc);
+ if (VirtualTransactionIdEquals(vxid, proc_vxid))
+ {
+ /* Startup process sets proc->backendId to InvalidBackendId. */
+ Assert(!gxact->inredo);
+
+ if (result != InvalidTransactionId)
+ {
+ *have_more = true;
+ break;
+ }
+ result = gxact->xid;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return result;
+}
+
/*
* TwoPhaseGetDummyBackendId
* Get the dummy backend ID for prepared transaction specified by XID
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
+ /*
+ * Transfer our locks to a dummy PGPROC. This has to be done before
+ * ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would
+ * conclude "xact already committed or aborted" for our locks.
+ */
+ PostPrepare_Locks(xid);
+
/*
* Let others know about no transaction in progress by me. This has to be
* done *after* the prepared transaction has been marked valid, else
PostPrepare_MultiXact(xid);
- PostPrepare_Locks(xid);
PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,
* To do this, obtain the current list of lockers, and wait on their VXIDs
* until they are finished.
*
- * Note we don't try to acquire the locks on the given locktags, only the VXIDs
- * of its lock holders; if somebody grabs a conflicting lock on the objects
- * after we obtained our initial list of lockers, we will not wait for them.
+ * Note we don't try to acquire the locks on the given locktags, only the
+ * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
+ * on the objects after we obtained our initial list of lockers, we will not
+ * wait for them.
*/
void
WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
* The result array is palloc'd and is terminated with an invalid VXID.
* *countp, if not null, is updated to the number of items set.
*
- * Of course, the result could be out of date by the time it's returned,
- * so use of this function has to be thought about carefully.
+ * Of course, the result could be out of date by the time it's returned, so
+ * use of this function has to be thought about carefully. Similarly, a
+ * PGPROC with no "lxid" will be considered non-conflicting regardless of any
+ * lock it holds. Existing callers don't care about a locker after that
+ * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after
+ * pg_xact updates and before releasing locks.
*
* Note we never include the current xact's vxid in the result array,
* since an xact never blocks itself.
}
}
+/*
+ * XactLockForVirtualXact
+ *
+ * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
+ * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those
+ * functions, it assumes "xid" is never a subtransaction and that "xid" is
+ * prepared, committed, or aborted.
+ *
+ * If !TransactionIdIsValid(xid), this locks every prepared XID having been
+ * known as "vxid" before its PREPARE TRANSACTION.
+ */
+static bool
+XactLockForVirtualXact(VirtualTransactionId vxid,
+ TransactionId xid, bool wait)
+{
+ bool more = false;
+
+ /* There is no point to wait for 2PCs if you have no 2PCs. */
+ if (max_prepared_xacts == 0)
+ return true;
+
+ do
+ {
+ LockAcquireResult lar;
+ LOCKTAG tag;
+
+ /* Clear state from previous iterations. */
+ if (more)
+ {
+ xid = InvalidTransactionId;
+ more = false;
+ }
+
+ /* If we have no xid, try to find one. */
+ if (!TransactionIdIsValid(xid))
+ xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
+ if (!TransactionIdIsValid(xid))
+ {
+ Assert(!more);
+ return true;
+ }
+
+ /* Check or wait for XID completion. */
+ SET_LOCKTAG_TRANSACTION(tag, xid);
+ lar = LockAcquire(&tag, ShareLock, false, !wait);
+ if (lar == LOCKACQUIRE_NOT_AVAIL)
+ return false;
+ LockRelease(&tag, ShareLock, false);
+ } while (more);
+
+ return true;
+}
+
/*
* VirtualXactLock
*
- * If wait = true, wait until the given VXID has been released, and then
- * return true.
+ * If wait = true, wait as long as the given VXID or any XID acquired by the
+ * same transaction is still running. Then, return true.
*
- * If wait = false, just check whether the VXID is still running, and return
- * true or false.
+ * If wait = false, just check whether that VXID or one of those XIDs is still
+ * running, and return true or false.
*/
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
LOCKTAG tag;
PGPROC *proc;
+ TransactionId xid = InvalidTransactionId;
Assert(VirtualTransactionIdIsValid(vxid));
- if (VirtualTransactionIdIsPreparedXact(vxid))
- {
- LockAcquireResult lar;
-
- /*
- * Prepared transactions don't hold vxid locks. The
- * LocalTransactionId is always a normal, locked XID.
- */
- SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId);
- lar = LockAcquire(&tag, ShareLock, false, !wait);
- if (lar != LOCKACQUIRE_NOT_AVAIL)
- LockRelease(&tag, ShareLock, false);
- return lar != LOCKACQUIRE_NOT_AVAIL;
- }
+ if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
+ /* no vxid lock; localTransactionId is a normal, locked XID */
+ return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
*/
proc = BackendIdGetProc(vxid.backendId);
if (proc == NULL)
- return true;
+ return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
/*
* We must acquire this lock before checking the backendId and lxid
*/
LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE);
- /* If the transaction has ended, our work here is done. */
if (proc->backendId != vxid.backendId
|| proc->fpLocalTransactionId != vxid.localTransactionId)
{
+ /* VXID ended */
LWLockRelease(&proc->fpInfoLock);
- return true;
+ return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
}
/*
proc->fpVXIDLock = false;
}
+ /*
+ * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
+ * search. The proc might have assigned this XID but not yet locked it,
+ * in which case the proc will lock this XID before releasing the VXID.
+ * The fpInfoLock critical section excludes VirtualXactLockTableCleanup(),
+ * so we won't save an XID of a different VXID. It doesn't matter whether
+ * we save this before or after setting up the primary lock table entry.
+ */
+ xid = proc->xid;
+
/* Done with proc->fpLockBits */
LWLockRelease(&proc->fpInfoLock);
(void) LockAcquire(&tag, ShareLock, false, false);
LockRelease(&tag, ShareLock, false);
- return true;
+ return XactLockForVirtualXact(vxid, xid, wait);
}
/*
* (XXX is it worth testing likewise for duplicate catcache flush entries?
* Probably not.)
*
+ * Many subsystems own higher-level caches that depend on relcache and/or
+ * catcache, and they register callbacks here to invalidate their caches.
+ * While building a higher-level cache entry, a backend may receive a
+ * callback for the being-built entry or one of its dependencies. This
+ * implies the new higher-level entry would be born stale, and it might
+ * remain stale for the life of the backend. Many caches do not prevent
+ * that. They rely on DDL for can't-miss catalog changes taking
+ * AccessExclusiveLock on suitable objects. (For a change made with less
+ * locking, backends might never read the change.) The relation cache,
+ * however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later
+ * than the beginning of the next transaction. Hence, when a relevant
+ * invalidation callback arrives during a build, relcache.c reattempts that
+ * build. Caches with similar needs could do likewise.
+ *
* If a relcache flush is issued for a system relation that we preload
* from the relcache init file, we must also delete the init file so that
* it will be rebuilt during the next backend restart. The actual work of
extern void AtAbort_Twophase(void);
extern void PostPrepare_Twophase(void);
+extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+ bool *have_more);
extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held);
extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held);
/*
* Top-level transactions are identified by VirtualTransactionIDs comprising
- * PGPROC fields backendId and lxid. For prepared transactions, the
- * LocalTransactionId is an ordinary XID. These are guaranteed unique over
- * the short term, but will be reused after a database restart or XID
- * wraparound; hence they should never be stored on disk.
+ * PGPROC fields backendId and lxid. For recovered prepared transactions, the
+ * LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never
+ * refers to that kind. These are guaranteed unique over the short term, but
+ * will be reused after a database restart or XID wraparound; hence they
+ * should never be stored on disk.
*
* Note that struct VirtualTransactionId can not be assumed to be atomically
* assignable as a whole. However, type LocalTransactionId is assumed to
#define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId)
#define VirtualTransactionIdIsValid(vxid) \
(LocalTransactionIdIsValid((vxid).localTransactionId))
-#define VirtualTransactionIdIsPreparedXact(vxid) \
+#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \
((vxid).backendId == InvalidBackendId)
#define VirtualTransactionIdEquals(vxid1, vxid2) \
((vxid1).backendId == (vxid2).backendId && \