Track logrep apply workers' last start times to avoid useless waits.
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 22 Jan 2023 19:08:46 +0000 (14:08 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 22 Jan 2023 19:08:46 +0000 (14:08 -0500)
Enforce wal_retrieve_retry_interval on a per-subscription basis,
rather than globally, and arrange to skip that delay in case of
an intentional worker exit.  This probably makes little difference
in the field, where apply workers wouldn't be restarted often;
but it has a significant impact on the runtime of our logical
replication regression tests (even though those tests use
artificially-small wal_retrieve_retry_interval settings already).

Nathan Bossart, with mostly-cosmetic editorialization by me

Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13

doc/src/sgml/config.sgml
doc/src/sgml/monitoring.sgml
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/storage/lmgr/lwlock.c
src/include/replication/logicallauncher.h
src/include/storage/lwlock.h

index dc9b78b0b7d4cb0b5a3fde948acfdc1606001444..f985afc009d04677618c27c8d57e88b60ecc0659 100644 (file)
@@ -4877,6 +4877,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
         environments where the number of times an infrastructure is accessed
         is taken into account.
        </para>
+       <para>
+        In logical replication, this parameter also limits how often a failing
+        replication apply worker will be respawned.
+       </para>
       </listitem>
      </varlistentry>
 
index e3a783abd0fee6bd552c9e17db6ca4b7c0f8c8b6..1756f1a4b6795a92839c5431f6f45ee7b582aa2e 100644 (file)
@@ -2008,6 +2008,16 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to read or update information
        about <quote>heavyweight</quote> locks.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalRepLauncherDSA</literal></entry>
+      <entry>Waiting to access logical replication launcher's dynamic shared
+       memory allocator.</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalRepLauncherHash</literal></entry>
+      <entry>Waiting to access logical replication launcher's shared
+       hash table.</entry>
+     </row>
      <row>
       <entry><literal>LogicalRepWorker</literal></entry>
       <entry>Waiting to read or update the state of logical replication
index baff00dd74ead20da69c11393fb24069a750191e..464db6d247fdc9173f36697e65215eb4b3901512 100644 (file)
@@ -1504,6 +1504,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    }
    list_free(subworkers);
 
+   /*
+    * Remove the no-longer-useful entry in the launcher's table of apply
+    * worker start times.
+    *
+    * If this transaction rolls back, the launcher might restart a failed
+    * apply worker before wal_retrieve_retry_interval milliseconds have
+    * elapsed, but that's pretty harmless.
+    */
+   ApplyLauncherForgetWorkerStartTime(subid);
+
    /*
     * Cleanup of tablesync replication origins.
     *
index 27e58566ceca850a6481aa334b0abed249d6dd47..564bffe5caf20654520f981e494492da5e534afb 100644 (file)
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -64,20 +65,47 @@ typedef struct LogicalRepCtxStruct
    /* Supervisor process. */
    pid_t       launcher_pid;
 
+   /* Hash table holding last start times of subscriptions' apply workers. */
+   dsa_handle  last_start_dsa;
+   dshash_table_handle last_start_dsh;
+
    /* Background workers. */
    LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
 
 static LogicalRepCtxStruct *LogicalRepCtx;
 
+/* an entry in the last-start-times shared hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+   Oid         subid;          /* OID of logrep subscription (hash key) */
+   TimestampTz last_start_time;    /* last time its apply worker was started */
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start-times shared hash table */
+static const dshash_parameters dsh_params = {
+   sizeof(Oid),
+   sizeof(LauncherLastStartTimesEntry),
+   dshash_memcmp,
+   dshash_memhash,
+   LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
+static bool on_commit_launcher_wakeup = false;
+
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 static int logicalrep_pa_worker_count(Oid subid);
-
-static bool on_commit_launcher_wakeup = false;
+static void logicalrep_launcher_attach_dshmem(void);
+static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
+static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
 
 
 /*
@@ -894,6 +922,9 @@ ApplyLauncherShmemInit(void)
 
        memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
 
+       LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+       LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
+
        /* Initialize memory and spin locks for each worker slot. */
        for (slot = 0; slot < max_logical_replication_workers; slot++)
        {
@@ -905,6 +936,105 @@ ApplyLauncherShmemInit(void)
    }
 }
 
+/*
+ * Initialize or attach to the dynamic shared hash table that stores the
+ * last-start times, if not already done.
+ * This must be called before accessing the table.
+ */
+static void
+logicalrep_launcher_attach_dshmem(void)
+{
+   MemoryContext oldcontext;
+
+   /* Quick exit if we already did this. */
+   if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID &&
+       last_start_times != NULL)
+       return;
+
+   /* Otherwise, use a lock to ensure only one process creates the table. */
+   LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+   /* Be sure any local memory allocated by DSA routines is persistent. */
+   oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+   if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+   {
+       /* Initialize dynamic shared hash table for last-start times. */
+       last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
+       dsa_pin(last_start_times_dsa);
+       dsa_pin_mapping(last_start_times_dsa);
+       last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
+
+       /* Store handles in shared memory for other backends to use. */
+       LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
+       LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
+   }
+   else if (!last_start_times)
+   {
+       /* Attach to existing dynamic shared hash table. */
+       last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
+       dsa_pin_mapping(last_start_times_dsa);
+       last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
+                                        LogicalRepCtx->last_start_dsh, 0);
+   }
+
+   MemoryContextSwitchTo(oldcontext);
+   LWLockRelease(LogicalRepWorkerLock);
+}
+
+/*
+ * Set the last-start time for the subscription.
+ */
+static void
+ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
+{
+   LauncherLastStartTimesEntry *entry;
+   bool        found;
+
+   logicalrep_launcher_attach_dshmem();
+
+   entry = dshash_find_or_insert(last_start_times, &subid, &found);
+   entry->last_start_time = start_time;
+   dshash_release_lock(last_start_times, entry);
+}
+
+/*
+ * Return the last-start time for the subscription, or 0 if there isn't one.
+ */
+static TimestampTz
+ApplyLauncherGetWorkerStartTime(Oid subid)
+{
+   LauncherLastStartTimesEntry *entry;
+   TimestampTz ret;
+
+   logicalrep_launcher_attach_dshmem();
+
+   entry = dshash_find(last_start_times, &subid, false);
+   if (entry == NULL)
+       return 0;
+
+   ret = entry->last_start_time;
+   dshash_release_lock(last_start_times, entry);
+
+   return ret;
+}
+
+/*
+ * Remove the last-start-time entry for the subscription, if one exists.
+ *
+ * This has two use-cases: to remove the entry related to a subscription
+ * that's been deleted or disabled (just to avoid leaking shared memory),
+ * and to allow immediate restart of an apply worker that has exited
+ * due to subscription parameter changes.
+ */
+void
+ApplyLauncherForgetWorkerStartTime(Oid subid)
+{
+   logicalrep_launcher_attach_dshmem();
+
+   (void) dshash_delete_key(last_start_times, &subid);
+}
+
 /*
  * Wakeup the launcher on commit if requested.
  */
@@ -947,8 +1077,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-   TimestampTz last_start_time = 0;
-
    ereport(DEBUG1,
            (errmsg_internal("logical replication launcher started")));
 
@@ -976,65 +1104,71 @@ ApplyLauncherMain(Datum main_arg)
        ListCell   *lc;
        MemoryContext subctx;
        MemoryContext oldctx;
-       TimestampTz now;
        long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
        CHECK_FOR_INTERRUPTS();
 
-       now = GetCurrentTimestamp();
+       /* Use temporary context to avoid leaking memory across cycles. */
+       subctx = AllocSetContextCreate(TopMemoryContext,
+                                      "Logical Replication Launcher sublist",
+                                      ALLOCSET_DEFAULT_SIZES);
+       oldctx = MemoryContextSwitchTo(subctx);
 
-       /* Limit the start retry to once a wal_retrieve_retry_interval */
-       if (TimestampDifferenceExceeds(last_start_time, now,
-                                      wal_retrieve_retry_interval))
+       /* Start any missing workers for enabled subscriptions. */
+       sublist = get_subscription_list();
+       foreach(lc, sublist)
        {
-           /* Use temporary context for the database list and worker info. */
-           subctx = AllocSetContextCreate(TopMemoryContext,
-                                          "Logical Replication Launcher sublist",
-                                          ALLOCSET_DEFAULT_SIZES);
-           oldctx = MemoryContextSwitchTo(subctx);
+           Subscription *sub = (Subscription *) lfirst(lc);
+           LogicalRepWorker *w;
+           TimestampTz last_start;
+           TimestampTz now;
+           long        elapsed;
 
-           /* search for subscriptions to start or stop. */
-           sublist = get_subscription_list();
-
-           /* Start the missing workers for enabled subscriptions. */
-           foreach(lc, sublist)
-           {
-               Subscription *sub = (Subscription *) lfirst(lc);
-               LogicalRepWorker *w;
+           if (!sub->enabled)
+               continue;
 
-               if (!sub->enabled)
-                   continue;
-
-               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-               w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-               LWLockRelease(LogicalRepWorkerLock);
-
-               if (w == NULL)
-               {
-                   last_start_time = now;
-                   wait_time = wal_retrieve_retry_interval;
+           LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+           w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+           LWLockRelease(LogicalRepWorkerLock);
 
-                   logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-                                            sub->owner, InvalidOid, DSM_HANDLE_INVALID);
-               }
-           }
+           if (w != NULL)
+               continue;       /* worker is running already */
 
-           /* Switch back to original memory context. */
-           MemoryContextSwitchTo(oldctx);
-           /* Clean the temporary memory. */
-           MemoryContextDelete(subctx);
-       }
-       else
-       {
            /*
-            * The wait in previous cycle was interrupted in less than
-            * wal_retrieve_retry_interval since last worker was started, this
-            * usually means crash of the worker, so we should retry in
-            * wal_retrieve_retry_interval again.
+            * If the worker is eligible to start now, launch it.  Otherwise,
+            * adjust wait_time so that we'll wake up as soon as it can be
+            * started.
+            *
+            * Each subscription's apply worker can only be restarted once per
+            * wal_retrieve_retry_interval, so that errors do not cause us to
+            * repeatedly restart the worker as fast as possible.  In cases
+            * where a restart is expected (e.g., subscription parameter
+            * changes), another process should remove the last-start entry
+            * for the subscription so that the worker can be restarted
+            * without waiting for wal_retrieve_retry_interval to elapse.
             */
-           wait_time = wal_retrieve_retry_interval;
+           last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+           now = GetCurrentTimestamp();
+           if (last_start == 0 ||
+               (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
+           {
+               ApplyLauncherSetWorkerStartTime(sub->oid, now);
+               logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+                                        sub->owner, InvalidOid,
+                                        DSM_HANDLE_INVALID);
+           }
+           else
+           {
+               wait_time = Min(wait_time,
+                               wal_retrieve_retry_interval - elapsed);
+           }
        }
 
+       /* Switch back to original memory context. */
+       MemoryContextSwitchTo(oldctx);
+       /* Clean the temporary memory. */
+       MemoryContextDelete(subctx);
+
        /* Wait for more work. */
        rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
index 38dfce71296718bfc60206d8cc7db945e35a7642..4647837b82305170f36b063b86a9121126699788 100644 (file)
@@ -628,7 +628,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
    }
 
    if (should_exit)
+   {
+       /*
+        * Reset the last-start time for this worker so that the launcher will
+        * restart it without waiting for wal_retrieve_retry_interval.
+        */
+       ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
+
        proc_exit(0);
+   }
 }
 
 /*
index a0084c7ef69d1cffca891eba1ec537d45d6ebac0..cfb2ab62481975d36a983ec4f0a48e004add07ae 100644 (file)
 #include "postmaster/walwriter.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicallauncher.h"
 #include "replication/logicalproto.h"
 #include "replication/logicalrelation.h"
 #include "replication/logicalworker.h"
@@ -3811,6 +3812,15 @@ apply_worker_exit(void)
        return;
    }
 
+   /*
+    * Reset the last-start time for this apply worker so that the launcher
+    * will restart it without waiting for wal_retrieve_retry_interval if the
+    * subscription is still active, and so that we won't leak that hash table
+    * entry if it isn't.
+    */
+   if (!am_tablesync_worker())
+       ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
+
    proc_exit(0);
 }
 
@@ -3851,6 +3861,9 @@ maybe_reread_subscription(void)
                (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
                        get_worker_name(), MySubscription->name)));
 
+       /* Ensure we remove no-longer-useful entry for worker's start time */
+       if (!am_tablesync_worker() && !am_parallel_apply_worker())
+           ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
        proc_exit(0);
    }
 
@@ -4421,6 +4434,9 @@ InitializeApplyWorker(void)
                (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
                        get_worker_name(), MyLogicalRepWorker->subid)));
 
+       /* Ensure we remove no-longer-useful entry for worker's start time */
+       if (!am_tablesync_worker() && !am_parallel_apply_worker())
+           ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
        proc_exit(0);
    }
 
@@ -4678,6 +4694,10 @@ DisableSubscriptionAndExit(void)
    DisableSubscription(MySubscription->oid);
    CommitTransactionCommand();
 
+   /* Ensure we remove no-longer-useful entry for worker's start time */
+   if (!am_tablesync_worker() && !am_parallel_apply_worker())
+       ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
+
    /* Notify the subscription has been disabled and exit */
    ereport(LOG,
            errmsg("subscription \"%s\" has been disabled because of an error",
index 196bece0a3d24ecbe96955cf82d411c1abfb55f5..d2ec39604519952ea9b2b89d234e7423f2e29760 100644 (file)
@@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
    "PgStatsHash",
    /* LWTRANCHE_PGSTATS_DATA: */
    "PgStatsData",
+   /* LWTRANCHE_LAUNCHER_DSA: */
+   "LogicalRepLauncherDSA",
+   /* LWTRANCHE_LAUNCHER_HASH: */
+   "LogicalRepLauncherHash",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
index 360e98702a8198fea0ed33a84dc47a8479d7e8c4..a07c9cb311a75c895a6dfb941e8034f7e0793f7f 100644 (file)
@@ -22,6 +22,8 @@ extern void ApplyLauncherMain(Datum main_arg);
 extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
+extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
+
 extern void ApplyLauncherWakeupAtCommit(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
index e4162db613c2eb7ce229bea308f17caeec7c1cc9..d2c7afb8f406793692198c1aa3c0a8c0020d510b 100644 (file)
@@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
    LWTRANCHE_PGSTATS_DSA,
    LWTRANCHE_PGSTATS_HASH,
    LWTRANCHE_PGSTATS_DATA,
+   LWTRANCHE_LAUNCHER_DSA,
+   LWTRANCHE_LAUNCHER_HASH,
    LWTRANCHE_FIRST_USER_DEFINED
 }          BuiltinTrancheIds;