]> granicus.if.org Git - postgresql/commitdiff
Improve division of labor between execParallel.c and nodeGather[Merge].c.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 1 Sep 2017 21:38:54 +0000 (17:38 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 1 Sep 2017 21:38:54 +0000 (17:38 -0400)
Move the responsibility for creating/destroying TupleQueueReaders into
execParallel.c, to avoid duplicative coding in nodeGather.c and
nodeGatherMerge.c.  Also, instead of having DestroyTupleQueueReader do
shm_mq_detach, do it in the caller (which is now only ExecParallelFinish).
This means execParallel.c does both the attaching and detaching of the
tuple-queue-reader shm_mqs, which seems less weird than the previous
arrangement.

These changes also eliminate a vestigial memory leak (of the pei->tqueue
array).  It's now demonstrable that rescans of Gather or GatherMerge don't
leak memory.

Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us

src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/backend/executor/tqueue.c
src/include/executor/execParallel.h

index 2313b4c45cbd27538b7a6a615faa8730eeaa3a16..7dda399daf351fc7e5297b985ec2cc42f11c244d 100644 (file)
@@ -498,9 +498,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
        pei->buffer_usage = bufusage_space;
 
-       /* Set up tuple queues. */
+       /* Set up the tuple queues that the workers will write into. */
        pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
+       /* We don't need the TupleQueueReaders yet, though. */
+       pei->reader = NULL;
+
        /*
         * If instrumentation options were supplied, allocate space for the data.
         * It only gets partially initialized here; the rest happens during
@@ -567,6 +570,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        return pei;
 }
 
+/*
+ * Set up tuple queue readers to read the results of a parallel subplan.
+ * All the workers are expected to return tuples matching tupDesc.
+ *
+ * This is separate from ExecInitParallelPlan() because we can launch the
+ * worker processes and let them start doing something before we do this.
+ */
+void
+ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+                                                 TupleDesc tupDesc)
+{
+       int                     nworkers = pei->pcxt->nworkers_launched;
+       int                     i;
+
+       Assert(pei->reader == NULL);
+
+       if (nworkers > 0)
+       {
+               pei->reader = (TupleQueueReader **)
+                       palloc(nworkers * sizeof(TupleQueueReader *));
+
+               for (i = 0; i < nworkers; i++)
+               {
+                       shm_mq_set_handle(pei->tqueue[i],
+                                                         pei->pcxt->worker[i].bgwhandle);
+                       pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
+                                                                                                       tupDesc);
+               }
+       }
+}
+
 /*
  * Re-initialize the parallel executor shared memory state before launching
  * a fresh batch of workers.
@@ -580,6 +614,7 @@ ExecParallelReinitialize(PlanState *planstate,
 
        ReinitializeParallelDSM(pei->pcxt);
        pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+       pei->reader = NULL;
        pei->finished = false;
 
        /* Traverse plan tree and let each child node reset associated state. */
@@ -691,16 +726,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
 {
+       int                     nworkers = pei->pcxt->nworkers_launched;
        int                     i;
 
+       /* Make this be a no-op if called twice in a row. */
        if (pei->finished)
                return;
 
-       /* First, wait for the workers to finish. */
+       /*
+        * Detach from tuple queues ASAP, so that any still-active workers will
+        * notice that no further results are wanted.
+        */
+       if (pei->tqueue != NULL)
+       {
+               for (i = 0; i < nworkers; i++)
+                       shm_mq_detach(pei->tqueue[i]);
+               pfree(pei->tqueue);
+               pei->tqueue = NULL;
+       }
+
+       /*
+        * While we're waiting for the workers to finish, let's get rid of the
+        * tuple queue readers.  (Any other local cleanup could be done here too.)
+        */
+       if (pei->reader != NULL)
+       {
+               for (i = 0; i < nworkers; i++)
+                       DestroyTupleQueueReader(pei->reader[i]);
+               pfree(pei->reader);
+               pei->reader = NULL;
+       }
+
+       /* Now wait for the workers to finish. */
        WaitForParallelWorkersToFinish(pei->pcxt);
 
-       /* Next, accumulate buffer usage. */
-       for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
+       /*
+        * Next, accumulate buffer usage.  (This must wait for the workers to
+        * finish, or we might get incomplete data.)
+        */
+       for (i = 0; i < nworkers; i++)
                InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
        /* Finally, accumulate instrumentation, if any. */
index 94b6ae5c30005d2f56f815bdffc2f3e75a25f5ee..3be735f5c3393388ea71d462ae519c86769104bc 100644 (file)
@@ -129,7 +129,6 @@ ExecGather(PlanState *pstate)
 {
        GatherState *node = castNode(GatherState, pstate);
        TupleTableSlot *fslot = node->funnel_slot;
-       int                     i;
        TupleTableSlot *slot;
        ExprContext *econtext;
 
@@ -171,33 +170,30 @@ ExecGather(PlanState *pstate)
                        LaunchParallelWorkers(pcxt);
                        /* We save # workers launched for the benefit of EXPLAIN */
                        node->nworkers_launched = pcxt->nworkers_launched;
-                       node->nreaders = 0;
-                       node->nextreader = 0;
 
                        /* Set up tuple queue readers to read the results. */
                        if (pcxt->nworkers_launched > 0)
                        {
-                               node->reader = palloc(pcxt->nworkers_launched *
-                                                                         sizeof(TupleQueueReader *));
-
-                               for (i = 0; i < pcxt->nworkers_launched; ++i)
-                               {
-                                       shm_mq_set_handle(node->pei->tqueue[i],
-                                                                         pcxt->worker[i].bgwhandle);
-                                       node->reader[node->nreaders++] =
-                                               CreateTupleQueueReader(node->pei->tqueue[i],
-                                                                                          fslot->tts_tupleDescriptor);
-                               }
+                               ExecParallelCreateReaders(node->pei,
+                                                                                 fslot->tts_tupleDescriptor);
+                               /* Make a working array showing the active readers */
+                               node->nreaders = pcxt->nworkers_launched;
+                               node->reader = (TupleQueueReader **)
+                                       palloc(node->nreaders * sizeof(TupleQueueReader *));
+                               memcpy(node->reader, node->pei->reader,
+                                          node->nreaders * sizeof(TupleQueueReader *));
                        }
                        else
                        {
                                /* No workers?  Then never mind. */
-                               ExecShutdownGatherWorkers(node);
+                               node->nreaders = 0;
+                               node->reader = NULL;
                        }
+                       node->nextreader = 0;
                }
 
                /* Run plan locally if no workers or not single-copy. */
-               node->need_to_scan_locally = (node->reader == NULL)
+               node->need_to_scan_locally = (node->nreaders == 0)
                        || !gather->single_copy;
                node->initialized = true;
        }
@@ -256,11 +252,11 @@ gather_getnext(GatherState *gatherstate)
        MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
        HeapTuple       tup;
 
-       while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
+       while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
        {
                CHECK_FOR_INTERRUPTS();
 
-               if (gatherstate->reader != NULL)
+               if (gatherstate->nreaders > 0)
                {
                        MemoryContext oldContext;
 
@@ -317,19 +313,15 @@ gather_readnext(GatherState *gatherstate)
                tup = TupleQueueReaderNext(reader, true, &readerdone);
 
                /*
-                * If this reader is done, remove it, and collapse the array.  If all
-                * readers are done, clean up remaining worker state.
+                * If this reader is done, remove it from our working array of active
+                * readers.  If all readers are done, we're outta here.
                 */
                if (readerdone)
                {
                        Assert(!tup);
-                       DestroyTupleQueueReader(reader);
                        --gatherstate->nreaders;
                        if (gatherstate->nreaders == 0)
-                       {
-                               ExecShutdownGatherWorkers(gatherstate);
                                return NULL;
-                       }
                        memmove(&gatherstate->reader[gatherstate->nextreader],
                                        &gatherstate->reader[gatherstate->nextreader + 1],
                                        sizeof(TupleQueueReader *)
@@ -376,37 +368,25 @@ gather_readnext(GatherState *gatherstate)
 /* ----------------------------------------------------------------
  *             ExecShutdownGatherWorkers
  *
- *             Destroy the parallel workers.  Collect all the stats after
- *             workers are stopped, else some work done by workers won't be
- *             accounted.
+ *             Stop all the parallel workers.
  * ----------------------------------------------------------------
  */
 static void
 ExecShutdownGatherWorkers(GatherState *node)
 {
-       /* Shut down tuple queue readers before shutting down workers. */
-       if (node->reader != NULL)
-       {
-               int                     i;
-
-               for (i = 0; i < node->nreaders; ++i)
-                       DestroyTupleQueueReader(node->reader[i]);
-
-               pfree(node->reader);
-               node->reader = NULL;
-       }
-
-       /* Now shut down the workers. */
        if (node->pei != NULL)
                ExecParallelFinish(node->pei);
+
+       /* Flush local copy of reader array */
+       if (node->reader)
+               pfree(node->reader);
+       node->reader = NULL;
 }
 
 /* ----------------------------------------------------------------
  *             ExecShutdownGather
  *
  *             Destroy the setup for parallel workers including parallel context.
- *             Collect all the stats after workers are stopped, else some work
- *             done by workers won't be accounted.
  * ----------------------------------------------------------------
  */
 void
index f67fbe717603ec5e195aee07d221079633c97d69..c0c285bd611647c26e1e2e477b1e8841580884fd 100644 (file)
@@ -177,7 +177,6 @@ ExecGatherMerge(PlanState *pstate)
        GatherMergeState *node = castNode(GatherMergeState, pstate);
        TupleTableSlot *slot;
        ExprContext *econtext;
-       int                     i;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -212,27 +211,23 @@ ExecGatherMerge(PlanState *pstate)
                        LaunchParallelWorkers(pcxt);
                        /* We save # workers launched for the benefit of EXPLAIN */
                        node->nworkers_launched = pcxt->nworkers_launched;
-                       node->nreaders = 0;
 
                        /* Set up tuple queue readers to read the results. */
                        if (pcxt->nworkers_launched > 0)
                        {
-                               node->reader = palloc(pcxt->nworkers_launched *
-                                                                         sizeof(TupleQueueReader *));
-
-                               for (i = 0; i < pcxt->nworkers_launched; ++i)
-                               {
-                                       shm_mq_set_handle(node->pei->tqueue[i],
-                                                                         pcxt->worker[i].bgwhandle);
-                                       node->reader[node->nreaders++] =
-                                               CreateTupleQueueReader(node->pei->tqueue[i],
-                                                                                          node->tupDesc);
-                               }
+                               ExecParallelCreateReaders(node->pei, node->tupDesc);
+                               /* Make a working array showing the active readers */
+                               node->nreaders = pcxt->nworkers_launched;
+                               node->reader = (TupleQueueReader **)
+                                       palloc(node->nreaders * sizeof(TupleQueueReader *));
+                               memcpy(node->reader, node->pei->reader,
+                                          node->nreaders * sizeof(TupleQueueReader *));
                        }
                        else
                        {
                                /* No workers?  Then never mind. */
-                               ExecShutdownGatherMergeWorkers(node);
+                               node->nreaders = 0;
+                               node->reader = NULL;
                        }
                }
 
@@ -282,8 +277,6 @@ ExecEndGatherMerge(GatherMergeState *node)
  *             ExecShutdownGatherMerge
  *
  *             Destroy the setup for parallel workers including parallel context.
- *             Collect all the stats after workers are stopped, else some work
- *             done by workers won't be accounted.
  * ----------------------------------------------------------------
  */
 void
@@ -302,30 +295,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
 /* ----------------------------------------------------------------
  *             ExecShutdownGatherMergeWorkers
  *
- *             Destroy the parallel workers.  Collect all the stats after
- *             workers are stopped, else some work done by workers won't be
- *             accounted.
+ *             Stop all the parallel workers.
  * ----------------------------------------------------------------
  */
 static void
 ExecShutdownGatherMergeWorkers(GatherMergeState *node)
 {
-       /* Shut down tuple queue readers before shutting down workers. */
-       if (node->reader != NULL)
-       {
-               int                     i;
-
-               for (i = 0; i < node->nreaders; ++i)
-                       if (node->reader[i])
-                               DestroyTupleQueueReader(node->reader[i]);
-
-               pfree(node->reader);
-               node->reader = NULL;
-       }
-
-       /* Now shut down the workers. */
        if (node->pei != NULL)
                ExecParallelFinish(node->pei);
+
+       /* Flush local copy of reader array */
+       if (node->reader)
+               pfree(node->reader);
+       node->reader = NULL;
 }
 
 /* ----------------------------------------------------------------
@@ -670,8 +652,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
        else if (tuple_buffer->done)
        {
                /* Reader is known to be exhausted. */
-               DestroyTupleQueueReader(gm_state->reader[reader - 1]);
-               gm_state->reader[reader - 1] = NULL;
                return false;
        }
        else
index 4339203085d254d83b97d6a9c6f957cfafb18732..42bf57a2ab8c7c801a48cc81c6b84b55f99021a8 100644 (file)
@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 
 /*
  * Destroy a tuple queue reader.
+ *
+ * Note: cleaning up the underlying shm_mq is the caller's responsibility.
+ * We won't access it here, as it may be detached already.
  */
 void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-       shm_mq_detach(reader->queue);
        if (reader->typmodmap != NULL)
                hash_destroy(reader->typmodmap);
        /* Is it worth trying to free substructure of the remap tree? */
index a6512245b15d5104a2ad2ff0b2e95778ee2fc776..8b714193c57aa23933d471384cf1fbe065034469 100644 (file)
@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
 
 typedef struct ParallelExecutorInfo
 {
-       PlanState  *planstate;
-       ParallelContext *pcxt;
-       BufferUsage *buffer_usage;
-       SharedExecutorInstrumentation *instrumentation;
-       shm_mq_handle **tqueue;
-       dsa_area   *area;
-       bool            finished;
+       PlanState  *planstate;          /* plan subtree we're running in parallel */
+       ParallelContext *pcxt;          /* parallel context we're using */
+       BufferUsage *buffer_usage;      /* points to bufusage area in DSM */
+       SharedExecutorInstrumentation *instrumentation; /* optional */
+       dsa_area   *area;                       /* points to DSA area in DSM */
+       bool            finished;               /* set true by ExecParallelFinish */
+       /* These two arrays have pcxt->nworkers_launched entries: */
+       shm_mq_handle **tqueue;         /* tuple queues for worker output */
+       struct TupleQueueReader **reader;       /* tuple reader/writer support */
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                                         EState *estate, int nworkers);
+extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+                                                 TupleDesc tupDesc);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(PlanState *planstate,