Add some assertions in syncrep.c
authorMichael Paquier <michael@paquier.xyz>
Fri, 1 Nov 2019 13:51:05 +0000 (22:51 +0900)
committerMichael Paquier <michael@paquier.xyz>
Fri, 1 Nov 2019 13:51:05 +0000 (22:51 +0900)
A couple of routines assume that the LWLock SyncRepLock needs to be
taken, so add a couple of assertions to be sure of that.  Also, when
waiting for a given LSN at transaction commit, the code implied that the
syncrep queue cleanup happens while holding interrupts, but the code
never checked after that.

Author: Michael Paquier
Reviewed-by: Fujii Masao, Kyotaro Horiguchi, Dongming Liu
Discussion: https://postgr.es/m/a0806273-8bbb-43b3-bbe1-c45a58f6ae21.lingce.ldm@alibaba-inc.com

src/backend/replication/syncrep.c

index 59a3d5bda6b8131f403181ee197cb43d1a3a4e7d..7bf7a1b7f8b71b69e8a69faa5831ae8912637862 100644 (file)
@@ -149,6 +149,13 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
    const char *old_status;
    int         mode;
 
+   /*
+    * This should be called while holding interrupts during a transaction
+    * commit to prevent the follow-up shared memory queue cleanups to be
+    * influenced by external interruptions.
+    */
+   Assert(InterruptHoldoffCount > 0);
+
    /* Cap the level for anything other than commit to remote flush only. */
    if (commit)
        mode = SyncRepWaitMode;
@@ -516,6 +523,8 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
+ * The caller must hold SyncRepLock.
+ *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
  * store the positions into *writePtr, *flushPtr and *applyPtr.
@@ -529,6 +538,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 {
    List       *sync_standbys;
 
+   Assert(LWLockHeldByMe(SyncRepLock));
+
    *writePtr = InvalidXLogRecPtr;
    *flushPtr = InvalidXLogRecPtr;
    *applyPtr = InvalidXLogRecPtr;
@@ -688,6 +699,8 @@ cmp_lsn(const void *a, const void *b)
 List *
 SyncRepGetSyncStandbys(bool *am_sync)
 {
+   Assert(LWLockHeldByMe(SyncRepLock));
+
    /* Set default result */
    if (am_sync != NULL)
        *am_sync = false;
@@ -992,7 +1005,7 @@ SyncRepGetStandbyPriority(void)
  * Pass all = true to wake whole queue; otherwise, just wake up to
  * the walsender's LSN.
  *
- * Must hold SyncRepLock.
+ * The caller must hold SyncRepLock in exclusive mode.
  */
 static int
 SyncRepWakeQueue(bool all, int mode)
@@ -1003,6 +1016,7 @@ SyncRepWakeQueue(bool all, int mode)
    int         numprocs = 0;
 
    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+   Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
    Assert(SyncRepQueueIsOrderedByLSN(mode));
 
    proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),