Improve division of labor between execParallel.c and nodeGather[Merge].c.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 1 Sep 2017 21:38:54 +0000 (17:38 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 1 Sep 2017 21:39:01 +0000 (17:39 -0400)
Move the responsibility for creating/destroying TupleQueueReaders into
execParallel.c, to avoid duplicative coding in nodeGather.c and
nodeGatherMerge.c.  Also, instead of having DestroyTupleQueueReader do
shm_mq_detach, do it in the caller (which is now only ExecParallelFinish).
This means execParallel.c does both the attaching and detaching of the
tuple-queue-reader shm_mqs, which seems less weird than the previous
arrangement.

These changes also eliminate a vestigial memory leak (of the pei->tqueue
array).  It's now demonstrable that rescans of Gather or GatherMerge don't
leak memory.

Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us

src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/backend/executor/tqueue.c
src/include/executor/execParallel.h

index c713b8513995c5a3674770944b063685821a5975..59f3744a147b8e27d40e51cb8e70f20a93f741a4 100644 (file)
@@ -534,9 +534,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
    pei->buffer_usage = bufusage_space;
 
-   /* Set up tuple queues. */
+   /* Set up the tuple queues that the workers will write into. */
    pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
+   /* We don't need the TupleQueueReaders yet, though. */
+   pei->reader = NULL;
+
    /*
     * If instrumentation options were supplied, allocate space for the data.
     * It only gets partially initialized here; the rest happens during
@@ -603,6 +606,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
    return pei;
 }
 
+/*
+ * Set up tuple queue readers to read the results of a parallel subplan.
+ * All the workers are expected to return tuples matching tupDesc.
+ *
+ * This is separate from ExecInitParallelPlan() because we can launch the
+ * worker processes and let them start doing something before we do this.
+ */
+void
+ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+                         TupleDesc tupDesc)
+{
+   int         nworkers = pei->pcxt->nworkers_launched;
+   int         i;
+
+   Assert(pei->reader == NULL);
+
+   if (nworkers > 0)
+   {
+       pei->reader = (TupleQueueReader **)
+           palloc(nworkers * sizeof(TupleQueueReader *));
+
+       for (i = 0; i < nworkers; i++)
+       {
+           shm_mq_set_handle(pei->tqueue[i],
+                             pei->pcxt->worker[i].bgwhandle);
+           pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
+                                                   tupDesc);
+       }
+   }
+}
+
 /*
  * Re-initialize the parallel executor shared memory state before launching
  * a fresh batch of workers.
@@ -616,6 +650,7 @@ ExecParallelReinitialize(PlanState *planstate,
 
    ReinitializeParallelDSM(pei->pcxt);
    pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+   pei->reader = NULL;
    pei->finished = false;
 
    /* Traverse plan tree and let each child node reset associated state. */
@@ -741,16 +776,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
 {
+   int         nworkers = pei->pcxt->nworkers_launched;
    int         i;
 
+   /* Make this be a no-op if called twice in a row. */
    if (pei->finished)
        return;
 
-   /* First, wait for the workers to finish. */
+   /*
+    * Detach from tuple queues ASAP, so that any still-active workers will
+    * notice that no further results are wanted.
+    */
+   if (pei->tqueue != NULL)
+   {
+       for (i = 0; i < nworkers; i++)
+           shm_mq_detach(pei->tqueue[i]);
+       pfree(pei->tqueue);
+       pei->tqueue = NULL;
+   }
+
+   /*
+    * While we're waiting for the workers to finish, let's get rid of the
+    * tuple queue readers.  (Any other local cleanup could be done here too.)
+    */
+   if (pei->reader != NULL)
+   {
+       for (i = 0; i < nworkers; i++)
+           DestroyTupleQueueReader(pei->reader[i]);
+       pfree(pei->reader);
+       pei->reader = NULL;
+   }
+
+   /* Now wait for the workers to finish. */
    WaitForParallelWorkersToFinish(pei->pcxt);
 
-   /* Next, accumulate buffer usage. */
-   for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
+   /*
+    * Next, accumulate buffer usage.  (This must wait for the workers to
+    * finish, or we might get incomplete data.)
+    */
+   for (i = 0; i < nworkers; i++)
        InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
    /* Finally, accumulate instrumentation, if any. */
index d93fbacdf9ee29f72ab8b1cf50c473530c85f57b..022d75b4b852ae56a8a3b22401c838cc270cea81 100644 (file)
@@ -130,7 +130,6 @@ ExecGather(PlanState *pstate)
 {
    GatherState *node = castNode(GatherState, pstate);
    TupleTableSlot *fslot = node->funnel_slot;
-   int         i;
    TupleTableSlot *slot;
    ExprContext *econtext;
 
@@ -173,33 +172,30 @@ ExecGather(PlanState *pstate)
            LaunchParallelWorkers(pcxt);
            /* We save # workers launched for the benefit of EXPLAIN */
            node->nworkers_launched = pcxt->nworkers_launched;
-           node->nreaders = 0;
-           node->nextreader = 0;
 
            /* Set up tuple queue readers to read the results. */
            if (pcxt->nworkers_launched > 0)
            {
-               node->reader = palloc(pcxt->nworkers_launched *
-                                     sizeof(TupleQueueReader *));
-
-               for (i = 0; i < pcxt->nworkers_launched; ++i)
-               {
-                   shm_mq_set_handle(node->pei->tqueue[i],
-                                     pcxt->worker[i].bgwhandle);
-                   node->reader[node->nreaders++] =
-                       CreateTupleQueueReader(node->pei->tqueue[i],
-                                              fslot->tts_tupleDescriptor);
-               }
+               ExecParallelCreateReaders(node->pei,
+                                         fslot->tts_tupleDescriptor);
+               /* Make a working array showing the active readers */
+               node->nreaders = pcxt->nworkers_launched;
+               node->reader = (TupleQueueReader **)
+                   palloc(node->nreaders * sizeof(TupleQueueReader *));
+               memcpy(node->reader, node->pei->reader,
+                      node->nreaders * sizeof(TupleQueueReader *));
            }
            else
            {
                /* No workers?  Then never mind. */
-               ExecShutdownGatherWorkers(node);
+               node->nreaders = 0;
+               node->reader = NULL;
            }
+           node->nextreader = 0;
        }
 
        /* Run plan locally if no workers or not single-copy. */
-       node->need_to_scan_locally = (node->reader == NULL)
+       node->need_to_scan_locally = (node->nreaders == 0)
            || !gather->single_copy;
        node->initialized = true;
    }
@@ -258,11 +254,11 @@ gather_getnext(GatherState *gatherstate)
    MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
    HeapTuple   tup;
 
-   while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
+   while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
    {
        CHECK_FOR_INTERRUPTS();
 
-       if (gatherstate->reader != NULL)
+       if (gatherstate->nreaders > 0)
        {
            MemoryContext oldContext;
 
@@ -319,19 +315,15 @@ gather_readnext(GatherState *gatherstate)
        tup = TupleQueueReaderNext(reader, true, &readerdone);
 
        /*
-        * If this reader is done, remove it, and collapse the array.  If all
-        * readers are done, clean up remaining worker state.
+        * If this reader is done, remove it from our working array of active
+        * readers.  If all readers are done, we're outta here.
         */
        if (readerdone)
        {
            Assert(!tup);
-           DestroyTupleQueueReader(reader);
            --gatherstate->nreaders;
            if (gatherstate->nreaders == 0)
-           {
-               ExecShutdownGatherWorkers(gatherstate);
                return NULL;
-           }
            memmove(&gatherstate->reader[gatherstate->nextreader],
                    &gatherstate->reader[gatherstate->nextreader + 1],
                    sizeof(TupleQueueReader *)
@@ -378,37 +370,25 @@ gather_readnext(GatherState *gatherstate)
 /* ----------------------------------------------------------------
  *     ExecShutdownGatherWorkers
  *
- *     Destroy the parallel workers.  Collect all the stats after
- *     workers are stopped, else some work done by workers won't be
- *     accounted.
+ *     Stop all the parallel workers.
  * ----------------------------------------------------------------
  */
 static void
 ExecShutdownGatherWorkers(GatherState *node)
 {
-   /* Shut down tuple queue readers before shutting down workers. */
-   if (node->reader != NULL)
-   {
-       int         i;
-
-       for (i = 0; i < node->nreaders; ++i)
-           DestroyTupleQueueReader(node->reader[i]);
-
-       pfree(node->reader);
-       node->reader = NULL;
-   }
-
-   /* Now shut down the workers. */
    if (node->pei != NULL)
        ExecParallelFinish(node->pei);
+
+   /* Flush local copy of reader array */
+   if (node->reader)
+       pfree(node->reader);
+   node->reader = NULL;
 }
 
 /* ----------------------------------------------------------------
  *     ExecShutdownGather
  *
  *     Destroy the setup for parallel workers including parallel context.
- *     Collect all the stats after workers are stopped, else some work
- *     done by workers won't be accounted.
  * ----------------------------------------------------------------
  */
 void
index b8bb4f8eb045202b4357bd8445d175a3a68e5955..d20d46606e4229ff2098fad5a359e1e74985d9cc 100644 (file)
@@ -178,7 +178,6 @@ ExecGatherMerge(PlanState *pstate)
    GatherMergeState *node = castNode(GatherMergeState, pstate);
    TupleTableSlot *slot;
    ExprContext *econtext;
-   int         i;
 
    CHECK_FOR_INTERRUPTS();
 
@@ -214,27 +213,23 @@ ExecGatherMerge(PlanState *pstate)
            LaunchParallelWorkers(pcxt);
            /* We save # workers launched for the benefit of EXPLAIN */
            node->nworkers_launched = pcxt->nworkers_launched;
-           node->nreaders = 0;
 
            /* Set up tuple queue readers to read the results. */
            if (pcxt->nworkers_launched > 0)
            {
-               node->reader = palloc(pcxt->nworkers_launched *
-                                     sizeof(TupleQueueReader *));
-
-               for (i = 0; i < pcxt->nworkers_launched; ++i)
-               {
-                   shm_mq_set_handle(node->pei->tqueue[i],
-                                     pcxt->worker[i].bgwhandle);
-                   node->reader[node->nreaders++] =
-                       CreateTupleQueueReader(node->pei->tqueue[i],
-                                              node->tupDesc);
-               }
+               ExecParallelCreateReaders(node->pei, node->tupDesc);
+               /* Make a working array showing the active readers */
+               node->nreaders = pcxt->nworkers_launched;
+               node->reader = (TupleQueueReader **)
+                   palloc(node->nreaders * sizeof(TupleQueueReader *));
+               memcpy(node->reader, node->pei->reader,
+                      node->nreaders * sizeof(TupleQueueReader *));
            }
            else
            {
                /* No workers?  Then never mind. */
-               ExecShutdownGatherMergeWorkers(node);
+               node->nreaders = 0;
+               node->reader = NULL;
            }
        }
 
@@ -284,8 +279,6 @@ ExecEndGatherMerge(GatherMergeState *node)
  *     ExecShutdownGatherMerge
  *
  *     Destroy the setup for parallel workers including parallel context.
- *     Collect all the stats after workers are stopped, else some work
- *     done by workers won't be accounted.
  * ----------------------------------------------------------------
  */
 void
@@ -304,30 +297,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
 /* ----------------------------------------------------------------
  *     ExecShutdownGatherMergeWorkers
  *
- *     Destroy the parallel workers.  Collect all the stats after
- *     workers are stopped, else some work done by workers won't be
- *     accounted.
+ *     Stop all the parallel workers.
  * ----------------------------------------------------------------
  */
 static void
 ExecShutdownGatherMergeWorkers(GatherMergeState *node)
 {
-   /* Shut down tuple queue readers before shutting down workers. */
-   if (node->reader != NULL)
-   {
-       int         i;
-
-       for (i = 0; i < node->nreaders; ++i)
-           if (node->reader[i])
-               DestroyTupleQueueReader(node->reader[i]);
-
-       pfree(node->reader);
-       node->reader = NULL;
-   }
-
-   /* Now shut down the workers. */
    if (node->pei != NULL)
        ExecParallelFinish(node->pei);
+
+   /* Flush local copy of reader array */
+   if (node->reader)
+       pfree(node->reader);
+   node->reader = NULL;
 }
 
 /* ----------------------------------------------------------------
@@ -672,8 +654,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
    else if (tuple_buffer->done)
    {
        /* Reader is known to be exhausted. */
-       DestroyTupleQueueReader(gm_state->reader[reader - 1]);
-       gm_state->reader[reader - 1] = NULL;
        return false;
    }
    else
index ee4bec0385e680178c3469ddd04165eee109db6e..6afcd1a30a701b001f27bec8218399a56697eb7a 100644 (file)
@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 
 /*
  * Destroy a tuple queue reader.
+ *
+ * Note: cleaning up the underlying shm_mq is the caller's responsibility.
+ * We won't access it here, as it may be detached already.
  */
 void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-   shm_mq_detach(reader->queue);
    if (reader->typmodmap != NULL)
        hash_destroy(reader->typmodmap);
    /* Is it worth trying to free substructure of the remap tree? */
index 1cb895d898430541af1b467601f41783a20a5996..ed231f2d53f625679cc4b95660746a25483eef80 100644 (file)
@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
 
 typedef struct ParallelExecutorInfo
 {
-   PlanState  *planstate;
-   ParallelContext *pcxt;
-   BufferUsage *buffer_usage;
-   SharedExecutorInstrumentation *instrumentation;
-   shm_mq_handle **tqueue;
-   dsa_area   *area;
-   bool        finished;
+   PlanState  *planstate;      /* plan subtree we're running in parallel */
+   ParallelContext *pcxt;      /* parallel context we're using */
+   BufferUsage *buffer_usage;  /* points to bufusage area in DSM */
+   SharedExecutorInstrumentation *instrumentation; /* optional */
+   dsa_area   *area;           /* points to DSA area in DSM */
+   bool        finished;       /* set true by ExecParallelFinish */
+   /* These two arrays have pcxt->nworkers_launched entries: */
+   shm_mq_handle **tqueue;     /* tuple queues for worker output */
+   struct TupleQueueReader **reader;   /* tuple reader/writer support */
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                     EState *estate, int nworkers, int64 tuples_needed);
+extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+                         TupleDesc tupDesc);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(PlanState *planstate,