shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space;
- /* Set up tuple queues. */
+ /* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
+ /* We don't need the TupleQueueReaders yet, though. */
+ pei->reader = NULL;
+
/*
* If instrumentation options were supplied, allocate space for the data.
* It only gets partially initialized here; the rest happens during
return pei;
}
+/*
+ * Set up tuple queue readers to read the results of a parallel subplan.
+ * All the workers are expected to return tuples matching tupDesc.
+ *
+ * This is separate from ExecInitParallelPlan() because we can launch the
+ * worker processes and let them start doing something before we do this.
+ */
+void
+ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+ TupleDesc tupDesc)
+{
+ int nworkers = pei->pcxt->nworkers_launched;
+ int i;
+
+ Assert(pei->reader == NULL);
+
+ if (nworkers > 0)
+ {
+ pei->reader = (TupleQueueReader **)
+ palloc(nworkers * sizeof(TupleQueueReader *));
+
+ for (i = 0; i < nworkers; i++)
+ {
+ shm_mq_set_handle(pei->tqueue[i],
+ pei->pcxt->worker[i].bgwhandle);
+ pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
+ tupDesc);
+ }
+ }
+}
+
/*
* Re-initialize the parallel executor shared memory state before launching
* a fresh batch of workers.
ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->reader = NULL;
pei->finished = false;
/* Traverse plan tree and let each child node reset associated state. */
void
ExecParallelFinish(ParallelExecutorInfo *pei)
{
+ int nworkers = pei->pcxt->nworkers_launched;
int i;
+ /* Make this be a no-op if called twice in a row. */
if (pei->finished)
return;
- /* First, wait for the workers to finish. */
+ /*
+ * Detach from tuple queues ASAP, so that any still-active workers will
+ * notice that no further results are wanted.
+ */
+ if (pei->tqueue != NULL)
+ {
+ for (i = 0; i < nworkers; i++)
+ shm_mq_detach(pei->tqueue[i]);
+ pfree(pei->tqueue);
+ pei->tqueue = NULL;
+ }
+
+ /*
+ * While we're waiting for the workers to finish, let's get rid of the
+ * tuple queue readers. (Any other local cleanup could be done here too.)
+ */
+ if (pei->reader != NULL)
+ {
+ for (i = 0; i < nworkers; i++)
+ DestroyTupleQueueReader(pei->reader[i]);
+ pfree(pei->reader);
+ pei->reader = NULL;
+ }
+
+ /* Now wait for the workers to finish. */
WaitForParallelWorkersToFinish(pei->pcxt);
- /* Next, accumulate buffer usage. */
- for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
+ /*
+ * Next, accumulate buffer usage. (This must wait for the workers to
+ * finish, or we might get incomplete data.)
+ */
+ for (i = 0; i < nworkers; i++)
InstrAccumParallelQuery(&pei->buffer_usage[i]);
/* Finally, accumulate instrumentation, if any. */
{
GatherState *node = castNode(GatherState, pstate);
TupleTableSlot *fslot = node->funnel_slot;
- int i;
TupleTableSlot *slot;
ExprContext *econtext;
LaunchParallelWorkers(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched;
- node->nreaders = 0;
- node->nextreader = 0;
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0)
{
- node->reader = palloc(pcxt->nworkers_launched *
- sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers_launched; ++i)
- {
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- fslot->tts_tupleDescriptor);
- }
+ ExecParallelCreateReaders(node->pei,
+ fslot->tts_tupleDescriptor);
+ /* Make a working array showing the active readers */
+ node->nreaders = pcxt->nworkers_launched;
+ node->reader = (TupleQueueReader **)
+ palloc(node->nreaders * sizeof(TupleQueueReader *));
+ memcpy(node->reader, node->pei->reader,
+ node->nreaders * sizeof(TupleQueueReader *));
}
else
{
/* No workers? Then never mind. */
- ExecShutdownGatherWorkers(node);
+ node->nreaders = 0;
+ node->reader = NULL;
}
+ node->nextreader = 0;
}
/* Run plan locally if no workers or not single-copy. */
- node->need_to_scan_locally = (node->reader == NULL)
+ node->need_to_scan_locally = (node->nreaders == 0)
|| !gather->single_copy;
node->initialized = true;
}
MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup;
- while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
+ while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{
CHECK_FOR_INTERRUPTS();
- if (gatherstate->reader != NULL)
+ if (gatherstate->nreaders > 0)
{
MemoryContext oldContext;
tup = TupleQueueReaderNext(reader, true, &readerdone);
/*
- * If this reader is done, remove it, and collapse the array. If all
- * readers are done, clean up remaining worker state.
+ * If this reader is done, remove it from our working array of active
+ * readers. If all readers are done, we're outta here.
*/
if (readerdone)
{
Assert(!tup);
- DestroyTupleQueueReader(reader);
--gatherstate->nreaders;
if (gatherstate->nreaders == 0)
- {
- ExecShutdownGatherWorkers(gatherstate);
return NULL;
- }
memmove(&gatherstate->reader[gatherstate->nextreader],
&gatherstate->reader[gatherstate->nextreader + 1],
sizeof(TupleQueueReader *)
/* ----------------------------------------------------------------
* ExecShutdownGatherWorkers
*
- * Destroy the parallel workers. Collect all the stats after
- * workers are stopped, else some work done by workers won't be
- * accounted.
+ * Stop all the parallel workers.
* ----------------------------------------------------------------
*/
static void
ExecShutdownGatherWorkers(GatherState *node)
{
- /* Shut down tuple queue readers before shutting down workers. */
- if (node->reader != NULL)
- {
- int i;
-
- for (i = 0; i < node->nreaders; ++i)
- DestroyTupleQueueReader(node->reader[i]);
-
- pfree(node->reader);
- node->reader = NULL;
- }
-
- /* Now shut down the workers. */
if (node->pei != NULL)
ExecParallelFinish(node->pei);
+
+ /* Flush local copy of reader array */
+ if (node->reader)
+ pfree(node->reader);
+ node->reader = NULL;
}
/* ----------------------------------------------------------------
* ExecShutdownGather
*
* Destroy the setup for parallel workers including parallel context.
- * Collect all the stats after workers are stopped, else some work
- * done by workers won't be accounted.
* ----------------------------------------------------------------
*/
void
GatherMergeState *node = castNode(GatherMergeState, pstate);
TupleTableSlot *slot;
ExprContext *econtext;
- int i;
CHECK_FOR_INTERRUPTS();
LaunchParallelWorkers(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched;
- node->nreaders = 0;
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0)
{
- node->reader = palloc(pcxt->nworkers_launched *
- sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers_launched; ++i)
- {
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- node->tupDesc);
- }
+ ExecParallelCreateReaders(node->pei, node->tupDesc);
+ /* Make a working array showing the active readers */
+ node->nreaders = pcxt->nworkers_launched;
+ node->reader = (TupleQueueReader **)
+ palloc(node->nreaders * sizeof(TupleQueueReader *));
+ memcpy(node->reader, node->pei->reader,
+ node->nreaders * sizeof(TupleQueueReader *));
}
else
{
/* No workers? Then never mind. */
- ExecShutdownGatherMergeWorkers(node);
+ node->nreaders = 0;
+ node->reader = NULL;
}
}
* ExecShutdownGatherMerge
*
* Destroy the setup for parallel workers including parallel context.
- * Collect all the stats after workers are stopped, else some work
- * done by workers won't be accounted.
* ----------------------------------------------------------------
*/
void
/* ----------------------------------------------------------------
* ExecShutdownGatherMergeWorkers
*
- * Destroy the parallel workers. Collect all the stats after
- * workers are stopped, else some work done by workers won't be
- * accounted.
+ * Stop all the parallel workers.
* ----------------------------------------------------------------
*/
static void
ExecShutdownGatherMergeWorkers(GatherMergeState *node)
{
- /* Shut down tuple queue readers before shutting down workers. */
- if (node->reader != NULL)
- {
- int i;
-
- for (i = 0; i < node->nreaders; ++i)
- if (node->reader[i])
- DestroyTupleQueueReader(node->reader[i]);
-
- pfree(node->reader);
- node->reader = NULL;
- }
-
- /* Now shut down the workers. */
if (node->pei != NULL)
ExecParallelFinish(node->pei);
+
+ /* Flush local copy of reader array */
+ if (node->reader)
+ pfree(node->reader);
+ node->reader = NULL;
}
/* ----------------------------------------------------------------
else if (tuple_buffer->done)
{
/* Reader is known to be exhausted. */
- DestroyTupleQueueReader(gm_state->reader[reader - 1]);
- gm_state->reader[reader - 1] = NULL;
return false;
}
else
/*
* Destroy a tuple queue reader.
+ *
+ * Note: cleaning up the underlying shm_mq is the caller's responsibility.
+ * We won't access it here, as it may be detached already.
*/
void
DestroyTupleQueueReader(TupleQueueReader *reader)
{
- shm_mq_detach(reader->queue);
if (reader->typmodmap != NULL)
hash_destroy(reader->typmodmap);
/* Is it worth trying to free substructure of the remap tree? */
typedef struct ParallelExecutorInfo
{
- PlanState *planstate;
- ParallelContext *pcxt;
- BufferUsage *buffer_usage;
- SharedExecutorInstrumentation *instrumentation;
- shm_mq_handle **tqueue;
- dsa_area *area;
- bool finished;
+ PlanState *planstate; /* plan subtree we're running in parallel */
+ ParallelContext *pcxt; /* parallel context we're using */
+ BufferUsage *buffer_usage; /* points to bufusage area in DSM */
+ SharedExecutorInstrumentation *instrumentation; /* optional */
+ dsa_area *area; /* points to DSA area in DSM */
+ bool finished; /* set true by ExecParallelFinish */
+ /* These two arrays have pcxt->nworkers_launched entries: */
+ shm_mq_handle **tqueue; /* tuple queues for worker output */
+ struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers, int64 tuples_needed);
+extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+ TupleDesc tupDesc);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(PlanState *planstate,