}
/*
- * Re-initialize the response queues for backend workers to return tuples
- * to the main backend and start the workers.
+ * Re-initialize the parallel executor info such that it can be reused by
+ * workers.
*/
-shm_mq_handle **
-ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+void
+ExecParallelReinitialize(ParallelExecutorInfo *pei)
{
- return ExecParallelSetupTupleQueues(pcxt, true);
+ ReinitializeParallelDSM(pei->pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->finished = false;
}
/*
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
+ pei->finished = false;
pei->planstate = planstate;
/* Fix up and serialize plan to be sent to workers. */
{
int i;
+ if (pei->finished)
+ return;
+
/* First, wait for the workers to finish. */
WaitForParallelWorkersToFinish(pei->pcxt);
if (pei->instrumentation)
ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation);
+
+ pei->finished = true;
}
/*
node->initialized = false;
if (node->pei)
- {
- ReinitializeParallelDSM(node->pei->pcxt);
- node->pei->tqueue =
- ExecParallelReinitializeTupleQueues(node->pei->pcxt);
- }
+ ExecParallelReinitialize(node->pei);
ExecReScan(node->ps.lefttree);
}
BufferUsage *buffer_usage;
SharedExecutorInstrumentation *instrumentation;
shm_mq_handle **tqueue;
+ bool finished;
} ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
-extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
+extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
#endif /* EXECPARALLEL_H */