From 5a3a95385bd5a8f1a4fd50545b7efe9338581899 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sun, 22 Jan 2023 14:08:46 -0500 Subject: [PATCH] Track logrep apply workers' last start times to avoid useless waits. Enforce wal_retrieve_retry_interval on a per-subscription basis, rather than globally, and arrange to skip that delay in case of an intentional worker exit. This probably makes little difference in the field, where apply workers wouldn't be restarted often; but it has a significant impact on the runtime of our logical replication regression tests (even though those tests use artificially-small wal_retrieve_retry_interval settings already). Nathan Bossart, with mostly-cosmetic editorialization by me Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13 --- doc/src/sgml/config.sgml | 4 + doc/src/sgml/monitoring.sgml | 10 + src/backend/commands/subscriptioncmds.c | 10 + src/backend/replication/logical/launcher.c | 232 +++++++++++++++----- src/backend/replication/logical/tablesync.c | 8 + src/backend/replication/logical/worker.c | 20 ++ src/backend/storage/lmgr/lwlock.c | 4 + src/include/replication/logicallauncher.h | 2 + src/include/storage/lwlock.h | 2 + 9 files changed, 243 insertions(+), 49 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index dc9b78b0b7..f985afc009 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4877,6 +4877,10 @@ ANY num_sync ( last_start_dsa = DSM_HANDLE_INVALID; + LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID; + /* Initialize memory and spin locks for each worker slot. */ for (slot = 0; slot < max_logical_replication_workers; slot++) { @@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void) } } +/* + * Initialize or attach to the dynamic shared hash table that stores the + * last-start times, if not already done. + * This must be called before accessing the table. + */ +static void +logicalrep_launcher_attach_dshmem(void) +{ + MemoryContext oldcontext; + + /* Quick exit if we already did this. */ + if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID && + last_start_times != NULL) + return; + + /* Otherwise, use a lock to ensure only one process creates the table. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + /* Be sure any local memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for last-start times. */ + last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA); + dsa_pin(last_start_times_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0); + + /* Store handles in shared memory for other backends to use. */ + LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa); + LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times); + } + else if (!last_start_times) + { + /* Attach to existing dynamic shared hash table. */ + last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa); + dsa_pin_mapping(last_start_times_dsa); + last_start_times = dshash_attach(last_start_times_dsa, &dsh_params, + LogicalRepCtx->last_start_dsh, 0); + } + + MemoryContextSwitchTo(oldcontext); + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Set the last-start time for the subscription. + */ +static void +ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time) +{ + LauncherLastStartTimesEntry *entry; + bool found; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find_or_insert(last_start_times, &subid, &found); + entry->last_start_time = start_time; + dshash_release_lock(last_start_times, entry); +} + +/* + * Return the last-start time for the subscription, or 0 if there isn't one. + */ +static TimestampTz +ApplyLauncherGetWorkerStartTime(Oid subid) +{ + LauncherLastStartTimesEntry *entry; + TimestampTz ret; + + logicalrep_launcher_attach_dshmem(); + + entry = dshash_find(last_start_times, &subid, false); + if (entry == NULL) + return 0; + + ret = entry->last_start_time; + dshash_release_lock(last_start_times, entry); + + return ret; +} + +/* + * Remove the last-start-time entry for the subscription, if one exists. + * + * This has two use-cases: to remove the entry related to a subscription + * that's been deleted or disabled (just to avoid leaking shared memory), + * and to allow immediate restart of an apply worker that has exited + * due to subscription parameter changes. + */ +void +ApplyLauncherForgetWorkerStartTime(Oid subid) +{ + logicalrep_launcher_attach_dshmem(); + + (void) dshash_delete_key(last_start_times, &subid); +} + /* * Wakeup the launcher on commit if requested. */ @@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; - ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg) ListCell *lc; MemoryContext subctx; MemoryContext oldctx; - TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - now = GetCurrentTimestamp(); + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); - - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) - { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + if (!sub->enabled) + continue; - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); - - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, DSM_HANDLE_INVALID); - } - } + if (w != NULL) + continue; /* worker is running already */ - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); - } - else - { /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases + * where a restart is expected (e.g., subscription parameter + * changes), another process should remove the last-start entry + * for the subscription so that the worker can be restarted + * without waiting for wal_retrieve_retry_interval to elapse. */ - wait_time = wal_retrieve_retry_interval; + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + wait_time = Min(wait_time, + wal_retrieve_retry_interval - elapsed); + } } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 38dfce7129..4647837b82 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } if (should_exit) + { + /* + * Reset the last-start time for this worker so that the launcher will + * restart it without waiting for wal_retrieve_retry_interval. + */ + ApplyLauncherForgetWorkerStartTime(MySubscription->oid); + proc_exit(0); + } } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a0084c7ef6..cfb2ab6248 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -174,6 +174,7 @@ #include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/logicalproto.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" @@ -3811,6 +3812,15 @@ apply_worker_exit(void) return; } + /* + * Reset the last-start time for this apply worker so that the launcher + * will restart it without waiting for wal_retrieve_retry_interval if the + * subscription is still active, and so that we won't leak that hash table + * entry if it isn't. + */ + if (!am_tablesync_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); + proc_exit(0); } @@ -3851,6 +3861,9 @@ maybe_reread_subscription(void) (errmsg("%s for subscription \"%s\" will stop because the subscription was removed", get_worker_name(), MySubscription->name))); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4421,6 +4434,9 @@ InitializeApplyWorker(void) (errmsg("%s for subscription %u will not start because the subscription was removed during startup", get_worker_name(), MyLogicalRepWorker->subid))); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); proc_exit(0); } @@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void) DisableSubscription(MySubscription->oid); CommitTransactionCommand(); + /* Ensure we remove no-longer-useful entry for worker's start time */ + if (!am_tablesync_worker() && !am_parallel_apply_worker()) + ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); + /* Notify the subscription has been disabled and exit */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 196bece0a3..d2ec396045 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_LAUNCHER_DSA: */ + "LogicalRepLauncherDSA", + /* LWTRANCHE_LAUNCHER_HASH: */ + "LogicalRepLauncherHash", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 360e98702a..a07c9cb311 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg); extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); +extern void ApplyLauncherForgetWorkerStartTime(Oid subid); + extern void ApplyLauncherWakeupAtCommit(void); extern void AtEOXact_ApplyLauncher(bool isCommit); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index e4162db613..d2c7afb8f4 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_LAUNCHER_DSA, + LWTRANCHE_LAUNCHER_HASH, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; -- 2.39.5