Add subtransaction handling for table synchronization workers.
authorRobert Haas <rhaas@postgresql.org>
Mon, 16 Jul 2018 21:33:22 +0000 (17:33 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 16 Jul 2018 21:33:22 +0000 (17:33 -0400)
Since the old logic was completely unaware of subtransactions, a
change made in a subsequently-aborted subtransaction would still cause
workers to be stopped at toplevel transaction commit.  Fix that by
managing a stack of worker lists rather than just one.

Amit Khandekar and Robert Haas

Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com

src/backend/access/transam/xact.c
src/backend/replication/logical/launcher.c
src/include/replication/logicallauncher.h
src/tools/pgindent/typedefs.list

index 1da1f13ef33f0c882d0203db3fc5e579654293ed..9aa63c8792be56e4186ef2bbfc12198c00d3b6c3 100644 (file)
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
    AtEOSubXact_HashTables(true, s->nestingLevel);
    AtEOSubXact_PgStat(true, s->nestingLevel);
    AtSubCommit_Snapshot(s->nestingLevel);
+   AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
    /*
     * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
        AtEOSubXact_HashTables(false, s->nestingLevel);
        AtEOSubXact_PgStat(false, s->nestingLevel);
        AtSubAbort_Snapshot(s->nestingLevel);
+       AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
    }
 
    /*
index 6ef333b7257e993ce842ca91d365ae556d95099c..ada16adb67b1fc50de5cc0e0a07e7ac1316ef9f6 100644 (file)
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
    Oid         relid;
 } LogicalRepWorkerId;
 
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+   int         nestDepth;      /* Sub-transaction nest level */
+   List       *workers;        /* List of LogicalRepWorkerId */
+   struct StopWorkersData *parent; /* This need not be an immediate
+                                    * subtransaction parent */
+} StopWorkersData;
+
+/*
+ * Stack of StopWorkersData elements. Each stack element contains the workers
+ * to be stopped for that subtransaction.
+ */
+static StopWorkersData *on_commit_stop_workers = NULL;
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 void
 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 {
+   int         nestDepth = GetCurrentTransactionNestLevel();
    LogicalRepWorkerId *wid;
    MemoryContext oldctx;
 
    /* Make sure we store the info in context that survives until commit. */
    oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
+   /* Check that previous transactions were properly cleaned up. */
+   Assert(on_commit_stop_workers == NULL ||
+          nestDepth >= on_commit_stop_workers->nestDepth);
+
+   /*
+    * Push a new stack element if we don't already have one for the current
+    * nestDepth.
+    */
+   if (on_commit_stop_workers == NULL ||
+       nestDepth > on_commit_stop_workers->nestDepth)
+   {
+       StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+
+       newdata->nestDepth = nestDepth;
+       newdata->workers = NIL;
+       newdata->parent = on_commit_stop_workers;
+       on_commit_stop_workers = newdata;
+   }
+
+   /*
+    * Finally add a new worker into the worker list of the current
+    * subtransaction.
+    */
    wid = palloc(sizeof(LogicalRepWorkerId));
    wid->subid = subid;
    wid->relid = relid;
-
-   on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+   on_commit_stop_workers->workers =
+       lappend(on_commit_stop_workers->workers, wid);
 
    MemoryContextSwitchTo(oldctx);
 }
@@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
 bool
 XactManipulatesLogicalReplicationWorkers(void)
 {
-   return (on_commit_stop_workers != NIL);
+   return (on_commit_stop_workers != NULL);
 }
 
 /*
@@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+
+   Assert(on_commit_stop_workers == NULL ||
+          (on_commit_stop_workers->nestDepth == 1 &&
+           on_commit_stop_workers->parent == NULL));
+
    if (isCommit)
    {
        ListCell   *lc;
 
-       foreach(lc, on_commit_stop_workers)
+       if (on_commit_stop_workers != NULL)
        {
-           LogicalRepWorkerId *wid = lfirst(lc);
+           List       *workers = on_commit_stop_workers->workers;
+
+           foreach(lc, workers)
+           {
+               LogicalRepWorkerId *wid = lfirst(lc);
 
-           logicalrep_worker_stop(wid->subid, wid->relid);
+               logicalrep_worker_stop(wid->subid, wid->relid);
+           }
        }
 
        if (on_commit_launcher_wakeup)
@@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
     * No need to pfree on_commit_stop_workers.  It was allocated in
     * transaction memory context, which is going to be cleaned soon.
     */
-   on_commit_stop_workers = NIL;
+   on_commit_stop_workers = NULL;
    on_commit_launcher_wakeup = false;
 }
 
+/*
+ * On commit, merge the current on_commit_stop_workers list into the
+ * immediate parent, if present.
+ * On rollback, discard the current on_commit_stop_workers list.
+ * Pop out the stack.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+   StopWorkersData *parent;
+
+   /* Exit immediately if there's no work to do at this level. */
+   if (on_commit_stop_workers == NULL ||
+       on_commit_stop_workers->nestDepth < nestDepth)
+       return;
+
+   Assert(on_commit_stop_workers->nestDepth == nestDepth);
+
+   parent = on_commit_stop_workers->parent;
+
+   if (isCommit)
+   {
+       /*
+        * If the upper stack element is not an immediate parent
+        * subtransaction, just decrement the notional nesting depth without
+        * doing any real work.  Else, we need to merge the current workers
+        * list into the parent.
+        */
+       if (!parent || parent->nestDepth < nestDepth - 1)
+       {
+           on_commit_stop_workers->nestDepth--;
+           return;
+       }
+
+       parent->workers =
+           list_concat(parent->workers, on_commit_stop_workers->workers);
+   }
+   else
+   {
+       /*
+        * Abandon everything that was done at this nesting level.  Explicitly
+        * free memory to avoid a transaction-lifespan leak.
+        */
+       list_free_deep(on_commit_stop_workers->workers);
+   }
+
+   /*
+    * We have taken care of the current subtransaction workers list for both
+    * abort or commit. So we are ready to pop the stack.
+    */
+   pfree(on_commit_stop_workers);
+   on_commit_stop_workers = parent;
+}
+
 /*
  * Request wakeup of the launcher on commit of the transaction.
  *
index ef02512412e97a720f5112e4f86982be021851fb..9f840b7bc1330d58c7cf86195282bd64e43ac6bf 100644 (file)
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
index 03867cbce558eeb2560957694bd17454bd738738..ed68cc4085e008bbf393ab2b820645740192454f 100644 (file)
@@ -2227,6 +2227,7 @@ StdAnalyzeData
 StdRdOptions
 Step
 StopList
+StopWorkersData
 StrategyNumber
 StreamCtl
 StringInfo