Reset the logical worker type while cleaning up other worker info.
authorAmit Kapila <akapila@postgresql.org>
Fri, 25 Aug 2023 03:27:55 +0000 (08:57 +0530)
committerAmit Kapila <akapila@postgresql.org>
Fri, 25 Aug 2023 03:27:55 +0000 (08:57 +0530)
Commit 2a8b40e36 introduces the worker type field for logical replication
workers, but forgot to reset the type when the worker exits. This can lead
to recognizing a stopped worker as a valid logical replication worker.

Fix it by resetting the worker type and additionally adding the safeguard
to not use LogicalRepWorker until ->in_use is verified.

Reported-by: Thomas Munro based on cfbot reports.
Author: Hou Zhijie, Alvaro Herrera
Reviewed-by: Amit Kapila
Discussion: http://postgr.es/m/CA+hUKGK2RQh4LifVgBmkHsCYChP-65UwGXOmnCzYVa5aAt4GWg@mail.gmail.com

src/backend/replication/logical/launcher.c
src/include/replication/worker_internal.h

index 72e44d5a02dd8f417fb81a938669c8b7b5d6feda..7882fc91ce69655c16f431fa0af3414cf05ab46c 100644 (file)
@@ -793,6 +793,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 {
    Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
 
+   worker->type = WORKERTYPE_UNKNOWN;
    worker->in_use = false;
    worker->proc = NULL;
    worker->dbid = InvalidOid;
@@ -862,7 +863,7 @@ logicalrep_sync_worker_count(Oid subid)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-       if (w->subid == subid && isTablesyncWorker(w))
+       if (isTablesyncWorker(w) && w->subid == subid)
            res++;
    }
 
@@ -889,7 +890,7 @@ logicalrep_pa_worker_count(Oid subid)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-       if (w->subid == subid && isParallelApplyWorker(w))
+       if (isParallelApplyWorker(w) && w->subid == subid)
            res++;
    }
 
index a428663859b04bb338c672ffc8f68e698b59ad8a..8f4bed09585be7b2c6f16073b7b4448b662d0b00 100644 (file)
@@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                           XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
+                                      (worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->in_use && \
+                                  (worker)->type == WORKERTYPE_TABLESYNC)
 
 static inline bool
 am_tablesync_worker(void)
@@ -339,12 +341,14 @@ am_tablesync_worker(void)
 static inline bool
 am_leader_apply_worker(void)
 {
+   Assert(MyLogicalRepWorker->in_use);
    return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
 }
 
 static inline bool
 am_parallel_apply_worker(void)
 {
+   Assert(MyLogicalRepWorker->in_use);
    return isParallelApplyWorker(MyLogicalRepWorker);
 }