Convert the PGPROC->lwWaitLink list into a dlist instead of open coding it.
authorAndres Freund <andres@anarazel.de>
Thu, 25 Dec 2014 16:24:30 +0000 (17:24 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 25 Dec 2014 16:24:30 +0000 (17:24 +0100)
Besides being shorter and much easier to read it changes the logic in
LWLockRelease() to release all shared lockers when waking up any. This
can yield some significant performance improvements - and the fairness
isn't really much worse than before, as we always allowed new shared
lockers to jump the queue.

src/backend/access/transam/twophase.c
src/backend/storage/lmgr/lwlock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lwlock.h
src/include/storage/proc.h

index 40de84e934ed28a49ba2dcb4e15f5ca893b56c60..ad3e87271816e8773b3832480d0a0eb0be65d4b8 100644 (file)
@@ -390,7 +390,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
    proc->roleId = owner;
    proc->lwWaiting = false;
    proc->lwWaitMode = 0;
-   proc->lwWaitLink = NULL;
    proc->waitLock = NULL;
    proc->waitProcLock = NULL;
    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
index 43f4d6b6534bc46de60762c1e8c1bce165069688..d9d704461b7aebb97f2f7192ff46e36a828a6d78 100644 (file)
@@ -117,9 +117,9 @@ inline static void
 PRINT_LWDEBUG(const char *where, const LWLock *lock)
 {
    if (Trace_lwlocks)
-       elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
+       elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
             where, T_NAME(lock), T_ID(lock),
-            (int) lock->exclusive, lock->shared, lock->head,
+            (int) lock->exclusive, lock->shared,
             (int) lock->releaseOK);
 }
 
@@ -479,8 +479,7 @@ LWLockInitialize(LWLock *lock, int tranche_id)
    lock->exclusive = 0;
    lock->shared = 0;
    lock->tranche = tranche_id;
-   lock->head = NULL;
-   lock->tail = NULL;
+   dlist_init(&lock->waiters);
 }
 
 
@@ -619,12 +618,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 
        proc->lwWaiting = true;
        proc->lwWaitMode = mode;
-       proc->lwWaitLink = NULL;
-       if (lock->head == NULL)
-           lock->head = proc;
-       else
-           lock->tail->lwWaitLink = proc;
-       lock->tail = proc;
+       dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
        /* Can release the mutex now */
        SpinLockRelease(&lock->mutex);
@@ -840,12 +834,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 
        proc->lwWaiting = true;
        proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-       proc->lwWaitLink = NULL;
-       if (lock->head == NULL)
-           lock->head = proc;
-       else
-           lock->tail->lwWaitLink = proc;
-       lock->tail = proc;
+       dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
        /* Can release the mutex now */
        SpinLockRelease(&lock->mutex);
@@ -1002,10 +991,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
        proc->lwWaiting = true;
        proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
        /* waiters are added to the front of the queue */
-       proc->lwWaitLink = lock->head;
-       if (lock->head == NULL)
-           lock->tail = proc;
-       lock->head = proc;
+       dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
        /*
         * Set releaseOK, to make sure we get woken up as soon as the lock is
@@ -1087,9 +1073,10 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 void
 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
-   PGPROC     *head;
-   PGPROC     *proc;
-   PGPROC     *next;
+   dlist_head  wakeup;
+   dlist_mutable_iter iter;
+
+   dlist_init(&wakeup);
 
    /* Acquire mutex.  Time spent holding mutex should be short! */
    SpinLockAcquire(&lock->mutex);
@@ -1104,24 +1091,16 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
     * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
     * up. They are always in the front of the queue.
     */
-   head = lock->head;
-
-   if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+   dlist_foreach_modify(iter, &lock->waiters)
    {
-       proc = head;
-       next = proc->lwWaitLink;
-       while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
-       {
-           proc = next;
-           next = next->lwWaitLink;
-       }
+       PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+       if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+           break;
 
-       /* proc is now the last PGPROC to be released */
-       lock->head = next;
-       proc->lwWaitLink = NULL;
+       dlist_delete(&waiter->lwWaitLink);
+       dlist_push_tail(&wakeup, &waiter->lwWaitLink);
    }
-   else
-       head = NULL;
 
    /* We are done updating shared state of the lock itself. */
    SpinLockRelease(&lock->mutex);
@@ -1129,15 +1108,14 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
    /*
     * Awaken any waiters I removed from the queue.
     */
-   while (head != NULL)
+   dlist_foreach_modify(iter, &wakeup)
    {
-       proc = head;
-       head = proc->lwWaitLink;
-       proc->lwWaitLink = NULL;
+       PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+       dlist_delete(&waiter->lwWaitLink);
        /* check comment in LWLockRelease() about this barrier */
        pg_write_barrier();
-       proc->lwWaiting = false;
-       PGSemaphoreUnlock(&proc->sem);
+       waiter->lwWaiting = false;
+       PGSemaphoreUnlock(&waiter->sem);
    }
 }
 
@@ -1148,10 +1126,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockRelease(LWLock *lock)
 {
-   PGPROC     *head;
-   PGPROC     *proc;
+   dlist_head  wakeup;
+   dlist_mutable_iter iter;
    int         i;
 
+   dlist_init(&wakeup);
+
    PRINT_LWDEBUG("LWLockRelease", lock);
 
    /*
@@ -1187,58 +1167,39 @@ LWLockRelease(LWLock *lock)
     * if someone has already awakened waiters that haven't yet acquired the
     * lock.
     */
-   head = lock->head;
-   if (head != NULL)
+   if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
    {
-       if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+       /*
+        * Remove the to-be-awakened PGPROCs from the queue.
+        */
+       bool        releaseOK = true;
+       bool        wokeup_somebody = false;
+
+       dlist_foreach_modify(iter, &lock->waiters)
        {
-           /*
-            * Remove the to-be-awakened PGPROCs from the queue.
-            */
-           bool        releaseOK = true;
+           PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-           proc = head;
+           if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+               continue;
 
-           /*
-            * First wake up any backends that want to be woken up without
-            * acquiring the lock.
-            */
-           while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-               proc = proc->lwWaitLink;
+           dlist_delete(&waiter->lwWaitLink);
+           dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
            /*
-            * If the front waiter wants exclusive lock, awaken him only.
-            * Otherwise awaken as many waiters as want shared access.
+            * Prevent additional wakeups until retryer gets to
+            * run. Backends that are just waiting for the lock to become
+            * free don't retry automatically.
             */
-           if (proc->lwWaitMode != LW_EXCLUSIVE)
+           if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
            {
-               while (proc->lwWaitLink != NULL &&
-                      proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-               {
-                   if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-                       releaseOK = false;
-                   proc = proc->lwWaitLink;
-               }
-           }
-           /* proc is now the last PGPROC to be released */
-           lock->head = proc->lwWaitLink;
-           proc->lwWaitLink = NULL;
-
-           /*
-            * Prevent additional wakeups until retryer gets to run. Backends
-            * that are just waiting for the lock to become free don't retry
-            * automatically.
-            */
-           if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
                releaseOK = false;
+               wokeup_somebody = true;
+           }
 
-           lock->releaseOK = releaseOK;
-       }
-       else
-       {
-           /* lock is still held, can't awaken anything */
-           head = NULL;
+           if(waiter->lwWaitMode == LW_EXCLUSIVE)
+               break;
        }
+       lock->releaseOK = releaseOK;
    }
 
    /* We are done updating shared state of the lock itself. */
@@ -1249,13 +1210,12 @@ LWLockRelease(LWLock *lock)
    /*
     * Awaken any waiters I removed from the queue.
     */
-   while (head != NULL)
+   dlist_foreach_modify(iter, &wakeup)
    {
+       PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
        LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
                    "release waiter");
-       proc = head;
-       head = proc->lwWaitLink;
-       proc->lwWaitLink = NULL;
+       dlist_delete(&waiter->lwWaitLink);
        /*
         * Guarantee that lwWaiting being unset only becomes visible once the
         * unlink from the link has completed. Otherwise the target backend
@@ -1267,8 +1227,8 @@ LWLockRelease(LWLock *lock)
         * another lock.
         */
        pg_write_barrier();
-       proc->lwWaiting = false;
-       PGSemaphoreUnlock(&proc->sem);
+       waiter->lwWaiting = false;
+       PGSemaphoreUnlock(&waiter->sem);
    }
 
    /*
index ea88a241443f393995fc1a316f13cc41ae270af3..a4789fcdc87a9cbf253892b98be9e3bae75c1883 100644 (file)
@@ -372,7 +372,6 @@ InitProcess(void)
        MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
    MyProc->lwWaiting = false;
    MyProc->lwWaitMode = 0;
-   MyProc->lwWaitLink = NULL;
    MyProc->waitLock = NULL;
    MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
@@ -535,7 +534,6 @@ InitAuxiliaryProcess(void)
    MyPgXact->vacuumFlags = 0;
    MyProc->lwWaiting = false;
    MyProc->lwWaitMode = 0;
-   MyProc->lwWaitLink = NULL;
    MyProc->waitLock = NULL;
    MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
index 09654a873e3ded3c170cdbf327c0d7f598722b25..c84970a7add8c51c99cd3de75c163d5a41f352d8 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef LWLOCK_H
 #define LWLOCK_H
 
+#include "lib/ilist.h"
 #include "storage/s_lock.h"
 
 struct PGPROC;
@@ -50,9 +51,7 @@ typedef struct LWLock
    char        exclusive;      /* # of exclusive holders (0 or 1) */
    int         shared;         /* # of shared holders (0..MaxBackends) */
    int         tranche;        /* tranche ID */
-   struct PGPROC *head;        /* head of list of waiting PGPROCs */
-   struct PGPROC *tail;        /* tail of list of waiting PGPROCs */
-   /* tail is undefined when head is NULL */
+   dlist_head  waiters;        /* list of waiting PGPROCs */
 } LWLock;
 
 /*
index 4ad4164927ecd76f3250438b48c1c0e05287b7b9..e868f0cd9f1e0486a7659b5e8ff669d44fa457e2 100644 (file)
@@ -15,6 +15,7 @@
 #define _PROC_H_
 
 #include "access/xlogdefs.h"
+#include "lib/ilist.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -104,7 +105,7 @@ struct PGPROC
    /* Info about LWLock the process is currently waiting for, if any. */
    bool        lwWaiting;      /* true if waiting for an LW lock */
    uint8       lwWaitMode;     /* lwlock mode being waited for */
-   struct PGPROC *lwWaitLink;  /* next waiter for same LW lock */
+   dlist_node  lwWaitLink;     /* position in LW lock wait list */
 
    /* Info about lock the process is currently waiting for, if any. */
    /* waitLock and waitProcLock are NULL if not currently waiting. */