* POSTGRES process array code.
*
*
- * This module maintains arrays of the PGPROC and PGXACT structures for all
- * active backends. Although there are several uses for this, the principal
- * one is as a means of determining the set of currently running transactions.
+ * This module maintains arrays of PGPROC substructures, as well as associated
+ * arrays in ProcGlobal, for all active backends. Although there are several
+ * uses for this, the principal one is as a means of determining the set of
+ * currently running transactions.
*
* Because of various subtle race conditions it is critical that a backend
* hold the correct locks while setting or clearing its xid (in
/*
* Highest subxid that has been removed from KnownAssignedXids array to
* prevent overflow; or InvalidTransactionId if none. We track this for
- * similar reasons to tracking overflowing cached subxids in PGXACT
+ * similar reasons to tracking overflowing cached subxids in PGPROC
* entries. Must hold exclusive ProcArrayLock to change this, and shared
* lock to read it.
*/
/* oldest catalog xmin of any replication slot */
TransactionId replication_slot_catalog_xmin;
- /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
+ /* indexes into allProcs[], has PROCARRAY_MAXPROCS entries */
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
} ProcArrayStruct;
static ProcArrayStruct *procArray;
static PGPROC *allProcs;
-static PGXACT *allPgXact;
/*
* Bookkeeping for tracking emulated transactions in recovery
static TransactionId KnownAssignedXidsGetOldestXmin(void);
static void KnownAssignedXidsDisplay(int trace_level);
static void KnownAssignedXidsReset(void);
-static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
- PGXACT *pgxact, TransactionId latestXid);
+static inline void ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid);
static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
static void MaintainLatestCompletedXid(TransactionId latestXid);
static void MaintainLatestCompletedXidRecovery(TransactionId latestXid);
}
allProcs = ProcGlobal->allProcs;
- allPgXact = ProcGlobal->allPgXact;
/* Create or attach to the KnownAssignedXids arrays too, if needed */
if (EnableHotStandby)
(arrayP->numProcs - index) * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index + 1], &ProcGlobal->xids[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->xids));
+ memmove(&ProcGlobal->subxidStates[index + 1], &ProcGlobal->subxidStates[index],
+ (arrayP->numProcs - index) * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->vacuumFlags[index + 1], &ProcGlobal->vacuumFlags[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->vacuumFlags));
arrayP->pgprocnos[index] = proc->pgprocno;
ProcGlobal->xids[index] = proc->xid;
+ ProcGlobal->subxidStates[index] = proc->subxidStatus;
ProcGlobal->vacuumFlags[index] = proc->vacuumFlags;
arrayP->numProcs++;
MaintainLatestCompletedXid(latestXid);
ProcGlobal->xids[proc->pgxactoff] = 0;
+ ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false;
+ ProcGlobal->subxidStates[proc->pgxactoff].count = 0;
}
else
{
}
Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff] == 0));
+ Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].count == 0));
+ Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].overflowed == false));
ProcGlobal->vacuumFlags[proc->pgxactoff] = 0;
for (index = 0; index < arrayP->numProcs; index++)
(arrayP->numProcs - index - 1) * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index], &ProcGlobal->xids[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->xids));
+ memmove(&ProcGlobal->subxidStates[index], &ProcGlobal->subxidStates[index + 1],
+ (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->vacuumFlags[index], &ProcGlobal->vacuumFlags[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->vacuumFlags));
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
- PGXACT *pgxact = &allPgXact[proc->pgprocno];
-
if (TransactionIdIsValid(latestXid))
{
/*
*/
if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
{
- ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
+ ProcArrayEndTransactionInternal(proc, latestXid);
LWLockRelease(ProcArrayLock);
}
else
* estimate of global xmin, but that's OK.
*/
Assert(!TransactionIdIsValid(proc->xid));
+ Assert(proc->subxidStatus.count == 0);
+ Assert(!proc->subxidStatus.overflowed);
proc->lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
proc->delayChkpt = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
- Assert(pgxact->nxids == 0);
- Assert(pgxact->overflowed == false);
-
/* must be cleared with xid/xmin: */
/* avoid unnecessarily dirtying shared cachelines */
if (proc->vacuumFlags & PROC_VACUUM_STATE_MASK)
* We don't do any locking here; caller must handle that.
*/
static inline void
-ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
- TransactionId latestXid)
+ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
{
size_t pgxactoff = proc->pgxactoff;
}
/* Clear the subtransaction-XID cache too while holding the lock */
- pgxact->nxids = 0;
- pgxact->overflowed = false;
+ Assert(ProcGlobal->subxidStates[pgxactoff].count == proc->subxidStatus.count &&
+ ProcGlobal->subxidStates[pgxactoff].overflowed == proc->subxidStatus.overflowed);
+ if (proc->subxidStatus.count > 0 || proc->subxidStatus.overflowed)
+ {
+ ProcGlobal->subxidStates[pgxactoff].count = 0;
+ ProcGlobal->subxidStates[pgxactoff].overflowed = false;
+ proc->subxidStatus.count = 0;
+ proc->subxidStatus.overflowed = false;
+ }
/* Also advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid);
while (nextidx != INVALID_PGPROCNO)
{
PGPROC *proc = &allProcs[nextidx];
- PGXACT *pgxact = &allPgXact[nextidx];
- ProcArrayEndTransactionInternal(proc, pgxact, proc->procArrayGroupMemberXid);
+ ProcArrayEndTransactionInternal(proc, proc->procArrayGroupMemberXid);
/* Move to next proc in list. */
nextidx = pg_atomic_read_u32(&proc->procArrayGroupNext);
void
ProcArrayClearTransaction(PGPROC *proc)
{
- PGXACT *pgxact = &allPgXact[proc->pgprocno];
size_t pgxactoff;
/*
Assert(!proc->delayChkpt);
/* Clear the subtransaction-XID cache too */
- pgxact->nxids = 0;
- pgxact->overflowed = false;
+ Assert(ProcGlobal->subxidStates[pgxactoff].count == proc->subxidStatus.count &&
+ ProcGlobal->subxidStates[pgxactoff].overflowed == proc->subxidStatus.overflowed);
+ if (proc->subxidStatus.count > 0 || proc->subxidStatus.overflowed)
+ {
+ ProcGlobal->subxidStates[pgxactoff].count = 0;
+ ProcGlobal->subxidStates[pgxactoff].overflowed = false;
+ proc->subxidStatus.count = 0;
+ proc->subxidStatus.overflowed = false;
+ }
LWLockRelease(ProcArrayLock);
}
{
static TransactionId *xids = NULL;
static TransactionId *other_xids;
+ XidCacheStatus *other_subxidstates;
int nxids = 0;
ProcArrayStruct *arrayP = procArray;
TransactionId topxid;
}
other_xids = ProcGlobal->xids;
+ other_subxidstates = ProcGlobal->subxidStates;
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (size_t pgxactoff = 0; pgxactoff < numProcs; pgxactoff++)
{
int pgprocno;
- PGXACT *pgxact;
PGPROC *proc;
TransactionId pxid;
int pxids;
/*
* Step 2: check the cached child-Xids arrays
*/
- pgprocno = arrayP->pgprocnos[pgxactoff];
- pgxact = &allPgXact[pgprocno];
- pxids = pgxact->nxids;
+ pxids = other_subxidstates[pgxactoff].count;
pg_read_barrier(); /* pairs with barrier in GetNewTransactionId() */
pgprocno = arrayP->pgprocnos[pgxactoff];
proc = &allProcs[pgprocno];
* we hold ProcArrayLock. So we can't miss an Xid that we need to
* worry about.)
*/
- if (pgxact->overflowed)
+ if (other_subxidstates[pgxactoff].overflowed)
xids[nxids++] = pxid;
}
size_t numProcs = arrayP->numProcs;
TransactionId *xip = snapshot->xip;
int *pgprocnos = arrayP->pgprocnos;
+ XidCacheStatus *subxidStates = ProcGlobal->subxidStates;
uint8 *allVacuumFlags = ProcGlobal->vacuumFlags;
/*
*/
if (!suboverflowed)
{
- int pgprocno = pgprocnos[pgxactoff];
- PGXACT *pgxact = &allPgXact[pgprocno];
- if (pgxact->overflowed)
+ if (subxidStates[pgxactoff].overflowed)
suboverflowed = true;
else
{
- int nsubxids = pgxact->nxids;
+ int nsubxids = subxidStates[pgxactoff].count;
if (nsubxids > 0)
{
+ int pgprocno = pgprocnos[pgxactoff];
PGPROC *proc = &allProcs[pgprocno];
pg_read_barrier(); /* pairs with GetNewTransactionId */
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- int pgprocno = arrayP->pgprocnos[index];
- PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId xid;
/* Fetch xid just once - see GetNewTransactionId */
if (TransactionIdPrecedes(xid, oldestRunningXid))
oldestRunningXid = xid;
- if (pgxact->overflowed)
+ if (ProcGlobal->subxidStates[index].overflowed)
suboverflowed = true;
/*
*/
if (!suboverflowed)
{
+ XidCacheStatus *other_subxidstates = ProcGlobal->subxidStates;
+
for (index = 0; index < arrayP->numProcs; index++)
{
int pgprocno = arrayP->pgprocnos[index];
PGPROC *proc = &allProcs[pgprocno];
- PGXACT *pgxact = &allPgXact[pgprocno];
- int nxids;
+ int nsubxids;
/*
* Save subtransaction XIDs. Other backends can't add or remove
* entries while we're holding XidGenLock.
*/
- nxids = pgxact->nxids;
- if (nxids > 0)
+ nsubxids = other_subxidstates[index].count;
+ if (nsubxids > 0)
{
/* barrier not really required, as XidGenLock is held, but ... */
pg_read_barrier(); /* pairs with GetNewTransactionId */
memcpy(&xids[count], (void *) proc->subxids.xids,
- nxids * sizeof(TransactionId));
- count += nxids;
- subcount += nxids;
+ nsubxids * sizeof(TransactionId));
+ count += nsubxids;
+ subcount += nsubxids;
/*
* Top-level XID of a transaction is always less than any of
LWLockRelease(ProcArrayLock);
}
-
-#define XidCacheRemove(i) \
- do { \
- MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
- pg_write_barrier(); \
- MyPgXact->nxids--; \
- } while (0)
-
/*
* XidCacheRemoveRunningXids
*
{
int i,
j;
+ XidCacheStatus *mysubxidstat;
Assert(TransactionIdIsValid(xid));
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ mysubxidstat = &ProcGlobal->subxidStates[MyProc->pgxactoff];
+
/*
* Under normal circumstances xid and xids[] will be in increasing order,
* as will be the entries in subxids. Scan backwards to avoid O(N^2)
{
TransactionId anxid = xids[i];
- for (j = MyPgXact->nxids - 1; j >= 0; j--)
+ for (j = MyProc->subxidStatus.count - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
{
- XidCacheRemove(j);
+ MyProc->subxids.xids[j] = MyProc->subxids.xids[MyProc->subxidStatus.count - 1];
+ pg_write_barrier();
+ mysubxidstat->count--;
+ MyProc->subxidStatus.count--;
break;
}
}
* error during AbortSubTransaction. So instead of Assert, emit a
* debug warning.
*/
- if (j < 0 && !MyPgXact->overflowed)
+ if (j < 0 && !MyProc->subxidStatus.overflowed)
elog(WARNING, "did not find subXID %u in MyProc", anxid);
}
- for (j = MyPgXact->nxids - 1; j >= 0; j--)
+ for (j = MyProc->subxidStatus.count - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
{
- XidCacheRemove(j);
+ MyProc->subxids.xids[j] = MyProc->subxids.xids[MyProc->subxidStatus.count - 1];
+ pg_write_barrier();
+ mysubxidstat->count--;
+ MyProc->subxidStatus.count--;
break;
}
}
/* Ordinarily we should have found it, unless the cache has overflowed */
- if (j < 0 && !MyPgXact->overflowed)
+ if (j < 0 && !MyProc->subxidStatus.overflowed)
elog(WARNING, "did not find subXID %u in MyProc", xid);
/* Also advance global latestCompletedXid while holding the lock */
int IdleInTransactionSessionTimeout = 0;
bool log_lock_waits = false;
-/* Pointer to this process's PGPROC and PGXACT structs, if any */
+/* Pointer to this process's PGPROC struct, if any */
PGPROC *MyProc = NULL;
-PGXACT *MyPgXact = NULL;
/*
* This spinlock protects the freelist of recycled PGPROC structures.
size = add_size(size, mul_size(TotalProcs, sizeof(PGPROC)));
size = add_size(size, sizeof(slock_t));
- size = add_size(size, mul_size(MaxBackends, sizeof(PGXACT)));
- size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGXACT)));
- size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGXACT)));
size = add_size(size, mul_size(TotalProcs, sizeof(*ProcGlobal->xids)));
+ size = add_size(size, mul_size(TotalProcs, sizeof(*ProcGlobal->subxidStates)));
size = add_size(size, mul_size(TotalProcs, sizeof(*ProcGlobal->vacuumFlags)));
return size;
InitProcGlobal(void)
{
PGPROC *procs;
- PGXACT *pgxacts;
int i,
j;
bool found;
/* XXX allProcCount isn't really all of them; it excludes prepared xacts */
ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS;
- /*
- * Also allocate a separate array of PGXACT structures. This is separate
- * from the main PGPROC array so that the most heavily accessed data is
- * stored contiguously in memory in as few cache lines as possible. This
- * provides significant performance benefits, especially on a
- * multiprocessor system. There is one PGXACT structure for every PGPROC
- * structure.
- */
- pgxacts = (PGXACT *) ShmemAlloc(TotalProcs * sizeof(PGXACT));
- MemSet(pgxacts, 0, TotalProcs * sizeof(PGXACT));
- ProcGlobal->allPgXact = pgxacts;
-
/*
* Allocate arrays mirroring PGPROC fields in a dense manner. See
* PROC_HDR.
ProcGlobal->xids =
(TransactionId *) ShmemAlloc(TotalProcs * sizeof(*ProcGlobal->xids));
MemSet(ProcGlobal->xids, 0, TotalProcs * sizeof(*ProcGlobal->xids));
+ ProcGlobal->subxidStates = (XidCacheStatus *) ShmemAlloc(TotalProcs * sizeof(*ProcGlobal->subxidStates));
+ MemSet(ProcGlobal->subxidStates, 0, TotalProcs * sizeof(*ProcGlobal->subxidStates));
ProcGlobal->vacuumFlags = (uint8 *) ShmemAlloc(TotalProcs * sizeof(*ProcGlobal->vacuumFlags));
MemSet(ProcGlobal->vacuumFlags, 0, TotalProcs * sizeof(*ProcGlobal->vacuumFlags));
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
}
- MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
/*
* Cross-check that the PGPROC is of the type we expect; if this were not
((volatile PGPROC *) auxproc)->pid = MyProcPid;
MyProc = auxproc;
- MyPgXact = &ProcGlobal->allPgXact[auxproc->pgprocno];
SpinLockRelease(ProcStructLock);