Avoid aggregating worker instrumentation multiple times.
authorRobert Haas <rhaas@postgresql.org>
Wed, 18 Nov 2015 17:35:25 +0000 (12:35 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 18 Nov 2015 17:35:25 +0000 (12:35 -0500)
Amit Kapila, per design ideas from me.

src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/include/executor/execParallel.h

index eae13c5647752c79a42eff96406ed50b014edae5..6730037710912edc917913be650a820cfe6c2a91 100644 (file)
@@ -277,13 +277,15 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 }
 
 /*
- * Re-initialize the response queues for backend workers to return tuples
- * to the main backend and start the workers.
+ * Re-initialize the parallel executor info such that it can be reused by
+ * workers.
  */
-shm_mq_handle **
-ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+void
+ExecParallelReinitialize(ParallelExecutorInfo *pei)
 {
-   return ExecParallelSetupTupleQueues(pcxt, true);
+   ReinitializeParallelDSM(pei->pcxt);
+   pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+   pei->finished = false;
 }
 
 /*
@@ -308,6 +310,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 
    /* Allocate object for return value. */
    pei = palloc0(sizeof(ParallelExecutorInfo));
+   pei->finished = false;
    pei->planstate = planstate;
 
    /* Fix up and serialize plan to be sent to workers. */
@@ -469,6 +472,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 {
    int     i;
 
+   if (pei->finished)
+       return;
+
    /* First, wait for the workers to finish. */
    WaitForParallelWorkersToFinish(pei->pcxt);
 
@@ -480,6 +486,8 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
    if (pei->instrumentation)
        ExecParallelRetrieveInstrumentation(pei->planstate,
                                            pei->instrumentation);
+
+   pei->finished = true;
 }
 
 /*
index b368b48d01d839c76b6a871adc44595ddcc9aa96..b6e82d1664f83dedcdf1e1eb8e8e35071ffcff76 100644 (file)
@@ -456,11 +456,7 @@ ExecReScanGather(GatherState *node)
    node->initialized = false;
 
    if (node->pei)
-   {
-       ReinitializeParallelDSM(node->pei->pcxt);
-       node->pei->tqueue =
-               ExecParallelReinitializeTupleQueues(node->pei->pcxt);
-   }
+       ExecParallelReinitialize(node->pei);
 
    ExecReScan(node->ps.lefttree);
 }
index 23c29ebb9027de0c40794d82b951e09eda5d0958..b43af1dd2b3c220110d4d819ca61e207b541a0c4 100644 (file)
@@ -27,12 +27,13 @@ typedef struct ParallelExecutorInfo
    BufferUsage *buffer_usage;
    SharedExecutorInstrumentation *instrumentation;
    shm_mq_handle **tqueue;
+   bool    finished;
 }  ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                     EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
-extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
+extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
 
 #endif   /* EXECPARALLEL_H */