</listitem>
</varlistentry>
+ <varlistentry id="guc-log-lock-failure" xreflabel="log_lock_failure">
+ <term><varname>log_lock_failure</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>log_lock_failure</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Controls whether a detailed log message is produced
+ when a lock acquisition fails. This is useful for analyzing
+ the causes of lock failures. Currently, only lock failures
+ due to <literal>SELECT NOWAIT</literal> is supported.
+ The default is <literal>off</literal>. Only superusers and
+ users with the appropriate <literal>SET</literal> privilege
+ can change this setting.
+ </para>
+ </listitem>
+ </varlistentry>
+
+
<varlistentry id="guc-log-recovery-conflict-waits" xreflabel="log_recovery_conflict_waits">
<term><varname>log_recovery_conflict_waits</varname> (<type>boolean</type>)
<indexterm>
Relation rel, ItemPointer ctid, XLTW_Oper oper,
int *remaining);
static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status,
- uint16 infomask, Relation rel, int *remaining);
+ uint16 infomask, Relation rel, int *remaining,
+ bool logLockFailure);
static void index_delete_sort(TM_IndexDeleteOp *delstate);
static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
LockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
#define UnlockTupleTuplock(rel, tup, mode) \
UnlockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
-#define ConditionalLockTupleTuplock(rel, tup, mode) \
- ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
+#define ConditionalLockTupleTuplock(rel, tup, mode, log) \
+ ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock, (log))
#ifdef USE_PREFETCH
/*
case LockWaitSkip:
if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
status, infomask, relation,
- NULL))
+ NULL, false))
{
result = TM_WouldBlock;
/* recovery code expects to have buffer lock held */
case LockWaitError:
if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
status, infomask, relation,
- NULL))
+ NULL, log_lock_failure))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on row in relation \"%s\"",
XLTW_Lock);
break;
case LockWaitSkip:
- if (!ConditionalXactLockTableWait(xwait))
+ if (!ConditionalXactLockTableWait(xwait, false))
{
result = TM_WouldBlock;
/* recovery code expects to have buffer lock held */
}
break;
case LockWaitError:
- if (!ConditionalXactLockTableWait(xwait))
+ if (!ConditionalXactLockTableWait(xwait, log_lock_failure))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on row in relation \"%s\"",
break;
case LockWaitSkip:
- if (!ConditionalLockTupleTuplock(relation, tid, mode))
+ if (!ConditionalLockTupleTuplock(relation, tid, mode, false))
return false;
break;
case LockWaitError:
- if (!ConditionalLockTupleTuplock(relation, tid, mode))
+ if (!ConditionalLockTupleTuplock(relation, tid, mode, log_lock_failure))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on row in relation \"%s\"",
* fail if lock is unavailable. 'rel', 'ctid' and 'oper' are used to set up
* context information for error messages. 'remaining', if not NULL, receives
* the number of members that are still running, including any (non-aborted)
- * subtransactions of our own transaction.
+ * subtransactions of our own transaction. 'logLockFailure' indicates whether
+ * to log details when a lock acquisition fails with 'nowait' enabled.
*
* We do this by sleeping on each member using XactLockTableWait. Any
* members that belong to the current backend are *not* waited for, however;
Do_MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
uint16 infomask, bool nowait,
Relation rel, ItemPointer ctid, XLTW_Oper oper,
- int *remaining)
+ int *remaining, bool logLockFailure)
{
bool result = true;
MultiXactMember *members;
*/
if (nowait)
{
- result = ConditionalXactLockTableWait(memxid);
+ result = ConditionalXactLockTableWait(memxid, logLockFailure);
if (!result)
break;
}
int *remaining)
{
(void) Do_MultiXactIdWait(multi, status, infomask, false,
- rel, ctid, oper, remaining);
+ rel, ctid, oper, remaining, false);
}
/*
*/
static bool
ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status,
- uint16 infomask, Relation rel, int *remaining)
+ uint16 infomask, Relation rel, int *remaining,
+ bool logLockFailure)
{
return Do_MultiXactIdWait(multi, status, infomask, true,
- rel, NULL, XLTW_None, remaining);
+ rel, NULL, XLTW_None, remaining, logLockFailure);
}
/*
XLTW_FetchUpdated);
break;
case LockWaitSkip:
- if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+ if (!ConditionalXactLockTableWait(SnapshotDirty.xmax, false))
/* skip instead of waiting */
return TM_WouldBlock;
break;
case LockWaitError:
- if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+ if (!ConditionalXactLockTableWait(SnapshotDirty.xmax, log_lock_failure))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on row in relation \"%s\"",
SetLocktagRelationOid(&tag, relid);
- res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
+ false);
/*
* Now that we have the lock, check for invalidation messages, so that we
SetLocktagRelationOid(&tag, relid);
- res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+ false);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
- res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
+ false);
/*
* Now that we have the lock, check for invalidation messages; see notes
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
+ false);
/*
* Now that we have the lock, check for invalidation messages; see notes
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+ false);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
* Returns true iff the lock was acquired.
*/
bool
-ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
+ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode,
+ bool logLockFailure)
{
LOCKTAG tag;
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
- return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+ return (LockAcquireExtended(&tag, lockmode, false, true, true, NULL,
+ logLockFailure) != LOCKACQUIRE_NOT_AVAIL);
}
/*
* Returns true if the lock was acquired.
*/
bool
-ConditionalXactLockTableWait(TransactionId xid)
+ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
{
LOCKTAG tag;
bool first = true;
SET_LOCKTAG_TRANSACTION(tag, xid);
- if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
+ if (LockAcquireExtended(&tag, ShareLock, false, true, true, NULL,
+ logLockFailure)
+ == LOCKACQUIRE_NOT_AVAIL)
return false;
LockRelease(&tag, ShareLock, false);
objid,
objsubid);
- res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+ false);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
objid,
objsubid);
- res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+ false);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/spin.h"
#include "utils/resowner.h"
-/* This configuration variable is used to set the lock table size */
-int max_locks_per_xact; /* set by guc.c */
+/* GUC variables */
+int max_locks_per_xact; /* used to set the lock table size */
+bool log_lock_failure = false;
#define NLOCKENTS() \
mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
bool dontWait)
{
return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
- true, NULL);
+ true, NULL, false);
}
/*
*
* If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
* table entry if a lock is successfully acquired, or NULL if not.
+ *
+ * logLockFailure indicates whether to log details when a lock acquisition
+ * fails with dontWait = true.
*/
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait,
bool reportMemoryError,
- LOCALLOCK **locallockp)
+ LOCALLOCK **locallockp,
+ bool logLockFailure)
{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
if (dontWait)
{
+ /*
+ * Log lock holders and waiters as a detail log message if
+ * logLockFailure = true and lock acquisition fails with dontWait
+ * = true
+ */
+ if (logLockFailure)
+ {
+ StringInfoData buf,
+ lock_waiters_sbuf,
+ lock_holders_sbuf;
+ const char *modename;
+ int lockHoldersNum = 0;
+
+ initStringInfo(&buf);
+ initStringInfo(&lock_waiters_sbuf);
+ initStringInfo(&lock_holders_sbuf);
+
+ DescribeLockTag(&buf, &locallock->tag.lock);
+ modename = GetLockmodeName(locallock->tag.lock.locktag_lockmethodid,
+ lockmode);
+
+ /* Gather a list of all lock holders and waiters */
+ LWLockAcquire(partitionLock, LW_SHARED);
+ GetLockHoldersAndWaiters(locallock, &lock_holders_sbuf,
+ &lock_waiters_sbuf, &lockHoldersNum);
+ LWLockRelease(partitionLock);
+
+ ereport(LOG,
+ (errmsg("process %d could not obtain %s on %s",
+ MyProcPid, modename, buf.data),
+ errdetail_log_plural(
+ "Process holding the lock: %s, Wait queue: %s.",
+ "Processes holding the lock: %s, Wait queue: %s.",
+ lockHoldersNum,
+ lock_holders_sbuf.data,
+ lock_waiters_sbuf.data)));
+
+ pfree(buf.data);
+ pfree(lock_holders_sbuf.data);
+ pfree(lock_waiters_sbuf.data);
+ }
if (locallockp)
*locallockp = NULL;
return LOCKACQUIRE_NOT_AVAIL;
long secs;
int usecs;
long msecs;
- dlist_iter proc_iter;
- PROCLOCK *curproclock;
- bool first_holder = true,
- first_waiter = true;
int lockHoldersNum = 0;
initStringInfo(&buf);
msecs = secs * 1000 + usecs / 1000;
usecs = usecs % 1000;
- /*
- * we loop over the lock's procLocks to gather a list of all
- * holders and waiters. Thus we will be able to provide more
- * detailed information for lock debugging purposes.
- *
- * lock->procLocks contains all processes which hold or wait for
- * this lock.
- */
-
+ /* Gather a list of all lock holders and waiters */
LWLockAcquire(partitionLock, LW_SHARED);
-
- dlist_foreach(proc_iter, &lock->procLocks)
- {
- curproclock =
- dlist_container(PROCLOCK, lockLink, proc_iter.cur);
-
- /*
- * we are a waiter if myProc->waitProcLock == curproclock; we
- * are a holder if it is NULL or something different
- */
- if (curproclock->tag.myProc->waitProcLock == curproclock)
- {
- if (first_waiter)
- {
- appendStringInfo(&lock_waiters_sbuf, "%d",
- curproclock->tag.myProc->pid);
- first_waiter = false;
- }
- else
- appendStringInfo(&lock_waiters_sbuf, ", %d",
- curproclock->tag.myProc->pid);
- }
- else
- {
- if (first_holder)
- {
- appendStringInfo(&lock_holders_sbuf, "%d",
- curproclock->tag.myProc->pid);
- first_holder = false;
- }
- else
- appendStringInfo(&lock_holders_sbuf, ", %d",
- curproclock->tag.myProc->pid);
-
- lockHoldersNum++;
- }
- }
-
+ GetLockHoldersAndWaiters(locallock, &lock_holders_sbuf,
+ &lock_waiters_sbuf, &lockHoldersNum);
LWLockRelease(partitionLock);
if (deadlock_state == DS_SOFT_DEADLOCK)
errno = save_errno;
}
+/*
+ * GetLockHoldersAndWaiters - get lock holders and waiters for a lock
+ *
+ * Fill lock_holders_sbuf and lock_waiters_sbuf with the PIDs of processes holding
+ * and waiting for the lock, and set lockHoldersNum to the number of lock holders.
+ *
+ * The lock table's partition lock must be held on entry and remains held on exit.
+ */
+void
+GetLockHoldersAndWaiters(LOCALLOCK *locallock, StringInfo lock_holders_sbuf,
+ StringInfo lock_waiters_sbuf, int *lockHoldersNum)
+{
+ dlist_iter proc_iter;
+ PROCLOCK *curproclock;
+ LOCK *lock = locallock->lock;
+ bool first_holder = true,
+ first_waiter = true;
+
+#ifdef USE_ASSERT_CHECKING
+ {
+ uint32 hashcode = locallock->hashcode;
+ LWLock *partitionLock = LockHashPartitionLock(hashcode);
+
+ Assert(LWLockHeldByMe(partitionLock));
+ }
+#endif
+
+ *lockHoldersNum = 0;
+
+ /*
+ * Loop over the lock's procLocks to gather a list of all holders and
+ * waiters. Thus we will be able to provide more detailed information for
+ * lock debugging purposes.
+ *
+ * lock->procLocks contains all processes which hold or wait for this
+ * lock.
+ */
+ dlist_foreach(proc_iter, &lock->procLocks)
+ {
+ curproclock =
+ dlist_container(PROCLOCK, lockLink, proc_iter.cur);
+
+ /*
+ * We are a waiter if myProc->waitProcLock == curproclock; we are a
+ * holder if it is NULL or something different.
+ */
+ if (curproclock->tag.myProc->waitProcLock == curproclock)
+ {
+ if (first_waiter)
+ {
+ appendStringInfo(lock_waiters_sbuf, "%d",
+ curproclock->tag.myProc->pid);
+ first_waiter = false;
+ }
+ else
+ appendStringInfo(lock_waiters_sbuf, ", %d",
+ curproclock->tag.myProc->pid);
+ }
+ else
+ {
+ if (first_holder)
+ {
+ appendStringInfo(lock_holders_sbuf, "%d",
+ curproclock->tag.myProc->pid);
+ first_holder = false;
+ }
+ else
+ appendStringInfo(lock_holders_sbuf, ", %d",
+ curproclock->tag.myProc->pid);
+
+ (*lockHoldersNum)++;
+ }
+ }
+}
+
/*
* ProcWaitForSignal - wait for a signal from another backend.
*
false,
NULL, NULL, NULL
},
+ {
+ {"log_lock_failure", PGC_SUSET, LOGGING_WHAT,
+ gettext_noop("Logs lock failures."),
+ NULL
+ },
+ &log_lock_failure,
+ false,
+ NULL, NULL, NULL
+ },
{
{"log_recovery_conflict_waits", PGC_SIGHUP, LOGGING_WHAT,
gettext_noop("Logs standby recovery conflict waits."),
# %% = '%'
# e.g. '<%u%%%d> '
#log_lock_waits = off # log lock waits >= deadlock_timeout
+#log_lock_failure = off # log lock failures
#log_recovery_conflict_waits = off # log standby recovery conflict waits
# >= deadlock_timeout
#log_parameter_max_length = -1 # when logging statements, limit logged
/* Lock a tuple (see heap_lock_tuple before assuming you understand this) */
extern void LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode);
extern bool ConditionalLockTuple(Relation relation, ItemPointer tid,
- LOCKMODE lockmode);
+ LOCKMODE lockmode, bool logLockFailure);
extern void UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode);
/* Lock an XID (used to wait for a transaction to finish) */
extern void XactLockTableDelete(TransactionId xid);
extern void XactLockTableWait(TransactionId xid, Relation rel,
ItemPointer ctid, XLTW_Oper oper);
-extern bool ConditionalXactLockTableWait(TransactionId xid);
+extern bool ConditionalXactLockTableWait(TransactionId xid,
+ bool logLockFailure);
/* Lock VXIDs, specified by conflicting locktags */
extern void WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress);
/* GUC variables */
extern PGDLLIMPORT int max_locks_per_xact;
+extern PGDLLIMPORT bool log_lock_failure;
#ifdef LOCK_DEBUG
extern PGDLLIMPORT int Trace_lock_oidmin;
bool sessionLock,
bool dontWait,
bool reportMemoryError,
- LOCALLOCK **locallockp);
+ LOCALLOCK **locallockp,
+ bool logLockFailure);
extern void AbortStrongLockAcquire(void);
extern void MarkLockClear(LOCALLOCK *locallock);
extern bool LockRelease(const LOCKTAG *locktag,
extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
extern void CheckDeadLockAlert(void);
extern void LockErrorCleanup(void);
+extern void GetLockHoldersAndWaiters(LOCALLOCK *locallock,
+ StringInfo lock_holders_sbuf,
+ StringInfo lock_waiters_sbuf,
+ int *lockHoldersNum);
extern void ProcWaitForSignal(uint32 wait_event_info);
extern void ProcSendSignal(ProcNumber procNumber);