From 01edb5c7fc3bcf6aea15f2b3be36189b52ad9d1a Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Fri, 1 Sep 2017 17:38:54 -0400 Subject: [PATCH] Improve division of labor between execParallel.c and nodeGather[Merge].c. Move the responsibility for creating/destroying TupleQueueReaders into execParallel.c, to avoid duplicative coding in nodeGather.c and nodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader do shm_mq_detach, do it in the caller (which is now only ExecParallelFinish). This means execParallel.c does both the attaching and detaching of the tuple-queue-reader shm_mqs, which seems less weird than the previous arrangement. These changes also eliminate a vestigial memory leak (of the pei->tqueue array). It's now demonstrable that rescans of Gather or GatherMerge don't leak memory. Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us --- src/backend/executor/execParallel.c | 72 ++++++++++++++++++++++++-- src/backend/executor/nodeGather.c | 64 ++++++++--------------- src/backend/executor/nodeGatherMerge.c | 50 ++++++------------ src/backend/executor/tqueue.c | 4 +- src/include/executor/execParallel.h | 18 ++++--- 5 files changed, 119 insertions(+), 89 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 2313b4c45c..7dda399daf 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -498,9 +498,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); pei->buffer_usage = bufusage_space; - /* Set up tuple queues. */ + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + /* We don't need the TupleQueueReaders yet, though. */ + pei->reader = NULL; + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -567,6 +570,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) return pei; } +/* + * Set up tuple queue readers to read the results of a parallel subplan. + * All the workers are expected to return tuples matching tupDesc. + * + * This is separate from ExecInitParallelPlan() because we can launch the + * worker processes and let them start doing something before we do this. + */ +void +ExecParallelCreateReaders(ParallelExecutorInfo *pei, + TupleDesc tupDesc) +{ + int nworkers = pei->pcxt->nworkers_launched; + int i; + + Assert(pei->reader == NULL); + + if (nworkers > 0) + { + pei->reader = (TupleQueueReader **) + palloc(nworkers * sizeof(TupleQueueReader *)); + + for (i = 0; i < nworkers; i++) + { + shm_mq_set_handle(pei->tqueue[i], + pei->pcxt->worker[i].bgwhandle); + pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i], + tupDesc); + } + } +} + /* * Re-initialize the parallel executor shared memory state before launching * a fresh batch of workers. @@ -580,6 +614,7 @@ ExecParallelReinitialize(PlanState *planstate, ReinitializeParallelDSM(pei->pcxt); pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->reader = NULL; pei->finished = false; /* Traverse plan tree and let each child node reset associated state. */ @@ -691,16 +726,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, void ExecParallelFinish(ParallelExecutorInfo *pei) { + int nworkers = pei->pcxt->nworkers_launched; int i; + /* Make this be a no-op if called twice in a row. */ if (pei->finished) return; - /* First, wait for the workers to finish. */ + /* + * Detach from tuple queues ASAP, so that any still-active workers will + * notice that no further results are wanted. + */ + if (pei->tqueue != NULL) + { + for (i = 0; i < nworkers; i++) + shm_mq_detach(pei->tqueue[i]); + pfree(pei->tqueue); + pei->tqueue = NULL; + } + + /* + * While we're waiting for the workers to finish, let's get rid of the + * tuple queue readers. (Any other local cleanup could be done here too.) + */ + if (pei->reader != NULL) + { + for (i = 0; i < nworkers; i++) + DestroyTupleQueueReader(pei->reader[i]); + pfree(pei->reader); + pei->reader = NULL; + } + + /* Now wait for the workers to finish. */ WaitForParallelWorkersToFinish(pei->pcxt); - /* Next, accumulate buffer usage. */ - for (i = 0; i < pei->pcxt->nworkers_launched; ++i) + /* + * Next, accumulate buffer usage. (This must wait for the workers to + * finish, or we might get incomplete data.) + */ + for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i]); /* Finally, accumulate instrumentation, if any. */ diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 94b6ae5c30..3be735f5c3 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -129,7 +129,6 @@ ExecGather(PlanState *pstate) { GatherState *node = castNode(GatherState, pstate); TupleTableSlot *fslot = node->funnel_slot; - int i; TupleTableSlot *slot; ExprContext *econtext; @@ -171,33 +170,30 @@ ExecGather(PlanState *pstate) LaunchParallelWorkers(pcxt); /* We save # workers launched for the benefit of EXPLAIN */ node->nworkers_launched = pcxt->nworkers_launched; - node->nreaders = 0; - node->nextreader = 0; /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->reader = palloc(pcxt->nworkers_launched * - sizeof(TupleQueueReader *)); - - for (i = 0; i < pcxt->nworkers_launched; ++i) - { - shm_mq_set_handle(node->pei->tqueue[i], - pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = - CreateTupleQueueReader(node->pei->tqueue[i], - fslot->tts_tupleDescriptor); - } + ExecParallelCreateReaders(node->pei, + fslot->tts_tupleDescriptor); + /* Make a working array showing the active readers */ + node->nreaders = pcxt->nworkers_launched; + node->reader = (TupleQueueReader **) + palloc(node->nreaders * sizeof(TupleQueueReader *)); + memcpy(node->reader, node->pei->reader, + node->nreaders * sizeof(TupleQueueReader *)); } else { /* No workers? Then never mind. */ - ExecShutdownGatherWorkers(node); + node->nreaders = 0; + node->reader = NULL; } + node->nextreader = 0; } /* Run plan locally if no workers or not single-copy. */ - node->need_to_scan_locally = (node->reader == NULL) + node->need_to_scan_locally = (node->nreaders == 0) || !gather->single_copy; node->initialized = true; } @@ -256,11 +252,11 @@ gather_getnext(GatherState *gatherstate) MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory; HeapTuple tup; - while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) + while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) { CHECK_FOR_INTERRUPTS(); - if (gatherstate->reader != NULL) + if (gatherstate->nreaders > 0) { MemoryContext oldContext; @@ -317,19 +313,15 @@ gather_readnext(GatherState *gatherstate) tup = TupleQueueReaderNext(reader, true, &readerdone); /* - * If this reader is done, remove it, and collapse the array. If all - * readers are done, clean up remaining worker state. + * If this reader is done, remove it from our working array of active + * readers. If all readers are done, we're outta here. */ if (readerdone) { Assert(!tup); - DestroyTupleQueueReader(reader); --gatherstate->nreaders; if (gatherstate->nreaders == 0) - { - ExecShutdownGatherWorkers(gatherstate); return NULL; - } memmove(&gatherstate->reader[gatherstate->nextreader], &gatherstate->reader[gatherstate->nextreader + 1], sizeof(TupleQueueReader *) @@ -376,37 +368,25 @@ gather_readnext(GatherState *gatherstate) /* ---------------------------------------------------------------- * ExecShutdownGatherWorkers * - * Destroy the parallel workers. Collect all the stats after - * workers are stopped, else some work done by workers won't be - * accounted. + * Stop all the parallel workers. * ---------------------------------------------------------------- */ static void ExecShutdownGatherWorkers(GatherState *node) { - /* Shut down tuple queue readers before shutting down workers. */ - if (node->reader != NULL) - { - int i; - - for (i = 0; i < node->nreaders; ++i) - DestroyTupleQueueReader(node->reader[i]); - - pfree(node->reader); - node->reader = NULL; - } - - /* Now shut down the workers. */ if (node->pei != NULL) ExecParallelFinish(node->pei); + + /* Flush local copy of reader array */ + if (node->reader) + pfree(node->reader); + node->reader = NULL; } /* ---------------------------------------------------------------- * ExecShutdownGather * * Destroy the setup for parallel workers including parallel context. - * Collect all the stats after workers are stopped, else some work - * done by workers won't be accounted. * ---------------------------------------------------------------- */ void diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index f67fbe7176..c0c285bd61 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -177,7 +177,6 @@ ExecGatherMerge(PlanState *pstate) GatherMergeState *node = castNode(GatherMergeState, pstate); TupleTableSlot *slot; ExprContext *econtext; - int i; CHECK_FOR_INTERRUPTS(); @@ -212,27 +211,23 @@ ExecGatherMerge(PlanState *pstate) LaunchParallelWorkers(pcxt); /* We save # workers launched for the benefit of EXPLAIN */ node->nworkers_launched = pcxt->nworkers_launched; - node->nreaders = 0; /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->reader = palloc(pcxt->nworkers_launched * - sizeof(TupleQueueReader *)); - - for (i = 0; i < pcxt->nworkers_launched; ++i) - { - shm_mq_set_handle(node->pei->tqueue[i], - pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = - CreateTupleQueueReader(node->pei->tqueue[i], - node->tupDesc); - } + ExecParallelCreateReaders(node->pei, node->tupDesc); + /* Make a working array showing the active readers */ + node->nreaders = pcxt->nworkers_launched; + node->reader = (TupleQueueReader **) + palloc(node->nreaders * sizeof(TupleQueueReader *)); + memcpy(node->reader, node->pei->reader, + node->nreaders * sizeof(TupleQueueReader *)); } else { /* No workers? Then never mind. */ - ExecShutdownGatherMergeWorkers(node); + node->nreaders = 0; + node->reader = NULL; } } @@ -282,8 +277,6 @@ ExecEndGatherMerge(GatherMergeState *node) * ExecShutdownGatherMerge * * Destroy the setup for parallel workers including parallel context. - * Collect all the stats after workers are stopped, else some work - * done by workers won't be accounted. * ---------------------------------------------------------------- */ void @@ -302,30 +295,19 @@ ExecShutdownGatherMerge(GatherMergeState *node) /* ---------------------------------------------------------------- * ExecShutdownGatherMergeWorkers * - * Destroy the parallel workers. Collect all the stats after - * workers are stopped, else some work done by workers won't be - * accounted. + * Stop all the parallel workers. * ---------------------------------------------------------------- */ static void ExecShutdownGatherMergeWorkers(GatherMergeState *node) { - /* Shut down tuple queue readers before shutting down workers. */ - if (node->reader != NULL) - { - int i; - - for (i = 0; i < node->nreaders; ++i) - if (node->reader[i]) - DestroyTupleQueueReader(node->reader[i]); - - pfree(node->reader); - node->reader = NULL; - } - - /* Now shut down the workers. */ if (node->pei != NULL) ExecParallelFinish(node->pei); + + /* Flush local copy of reader array */ + if (node->reader) + pfree(node->reader); + node->reader = NULL; } /* ---------------------------------------------------------------- @@ -670,8 +652,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) else if (tuple_buffer->done) { /* Reader is known to be exhausted. */ - DestroyTupleQueueReader(gm_state->reader[reader - 1]); - gm_state->reader[reader - 1] = NULL; return false; } else diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 4339203085..42bf57a2ab 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) /* * Destroy a tuple queue reader. + * + * Note: cleaning up the underlying shm_mq is the caller's responsibility. + * We won't access it here, as it may be detached already. */ void DestroyTupleQueueReader(TupleQueueReader *reader) { - shm_mq_detach(reader->queue); if (reader->typmodmap != NULL) hash_destroy(reader->typmodmap); /* Is it worth trying to free substructure of the remap tree? */ diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index a6512245b1..8b714193c5 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; typedef struct ParallelExecutorInfo { - PlanState *planstate; - ParallelContext *pcxt; - BufferUsage *buffer_usage; - SharedExecutorInstrumentation *instrumentation; - shm_mq_handle **tqueue; - dsa_area *area; - bool finished; + PlanState *planstate; /* plan subtree we're running in parallel */ + ParallelContext *pcxt; /* parallel context we're using */ + BufferUsage *buffer_usage; /* points to bufusage area in DSM */ + SharedExecutorInstrumentation *instrumentation; /* optional */ + dsa_area *area; /* points to DSA area in DSM */ + bool finished; /* set true by ExecParallelFinish */ + /* These two arrays have pcxt->nworkers_launched entries: */ + shm_mq_handle **tqueue; /* tuple queues for worker output */ + struct TupleQueueReader **reader; /* tuple reader/writer support */ } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); +extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei, + TupleDesc tupDesc); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(PlanState *planstate, -- 2.40.0