* When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
* entries of other backends and also change the head pointer. When holding
* both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointer.
+ * can change the tail pointers.
*
* NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need multiple locks, we first get
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* tail must be <= the queue position of every
* listening backend */
+ int stopPage; /* oldest unrecycled page; must be <=
+ * tail.page */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
+#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
/* First time through, so initialize it */
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = InvalidBackendId;
asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
if (nexthead > QUEUE_MAX_PAGE)
nexthead = 0; /* wrap around */
- boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
+ boundary = QUEUE_STOP_PAGE;
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedes(nexthead, boundary);
}
* Return the fraction of the queue that is currently occupied.
*
* The caller must hold NotifyQueueLock in (at least) shared mode.
+ *
+ * Note: we measure the distance to the logical tail page, not the physical
+ * tail page. In some sense that's wrong, but the relative position of the
+ * physical tail is affected by details such as SLRU segment boundaries,
+ * so that a result based on that is unpleasantly unstable.
*/
static double
asyncQueueUsage(void)
/* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
- /* Compute the new tail. */
+ /*
+ * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
+ * (ie, exactly match at least one backend's queue position), so it must
+ * be updated atomically with the actual computation. Since v13, we could
+ * get away with not doing it like that, but it seems prudent to keep it
+ * so.
+ *
+ * Also, because incoming backends will scan forward from QUEUE_TAIL, that
+ * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
+ * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
+ * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
+ * there are pages we can truncate but haven't yet finished doing so.
+ *
+ * For concurrency's sake, we don't want to hold NotifyQueueLock while
+ * performing SimpleLruTruncate. This is OK because no backend will try
+ * to access the pages we are in the midst of truncating.
+ */
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
}
- oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
+ QUEUE_TAIL = min;
+ oldtailpage = QUEUE_STOP_PAGE;
LWLockRelease(NotifyQueueLock);
/*
* release the lock again.
*/
SimpleLruTruncate(NotifyCtl, newtailpage);
- }
- /*
- * Advertise the new tail. This changes asyncQueueIsFull()'s verdict for
- * the segment immediately prior to the new tail, allowing fresh data into
- * that segment.
- */
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- QUEUE_TAIL = min;
- LWLockRelease(NotifyQueueLock);
+ /*
+ * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
+ * for the segment immediately prior to the old tail, allowing fresh
+ * data into that segment.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ QUEUE_STOP_PAGE = newtailpage;
+ LWLockRelease(NotifyQueueLock);
+ }
LWLockRelease(NotifyQueueTailLock);
}