{
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
+ worker->type = WORKERTYPE_UNKNOWN;
worker->in_use = false;
worker->proc = NULL;
worker->dbid = InvalidOid;
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && isTablesyncWorker(w))
+ if (isTablesyncWorker(w) && w->subid == subid)
res++;
}
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && isParallelApplyWorker(w))
+ if (isParallelApplyWorker(w) && w->subid == subid)
res++;
}
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_TABLESYNC)
static inline bool
am_tablesync_worker(void)
static inline bool
am_leader_apply_worker(void)
{
+ Assert(MyLogicalRepWorker->in_use);
return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
}
static inline bool
am_parallel_apply_worker(void)
{
+ Assert(MyLogicalRepWorker->in_use);
return isParallelApplyWorker(MyLogicalRepWorker);
}