if (stmt->drop_slot)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
- rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
+ /*
+ * Lock pg_subscription with AccessExclusiveLock to ensure
+ * that the launcher doesn't restart new worker during dropping
+ * the subscription
+ */
+ rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname));
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
- /* Protect against launcher restarting the worker. */
- LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
-
/* Kill the apply worker so that the slot becomes accessible. */
logicalrep_worker_stop(subid);
- LWLockRelease(LogicalRepLauncherLock);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
/*
* Stop the logical replication worker and wait until it detaches from the
* slot.
- *
- * The caller must hold LogicalRepLauncherLock to ensure that new workers are
- * not being started during this function call.
*/
void
logicalrep_worker_stop(Oid subid)
{
LogicalRepWorker *worker;
- Assert(LWLockHeldByMe(LogicalRepLauncherLock));
-
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid);
ALLOCSET_DEFAULT_MAXSIZE);
oldctx = MemoryContextSwitchTo(subctx);
- /* Block any concurrent DROP SUBSCRIPTION. */
- LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
-
/* search for subscriptions to start or stop. */
sublist = get_subscription_list();
}
}
- LWLockRelease(LogicalRepLauncherLock);
-
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */