]> granicus.if.org Git - postgresql/commitdiff
Rewrite interaction of parallel mode with parallel executor support.
authorRobert Haas <rhaas@postgresql.org>
Fri, 16 Oct 2015 15:56:02 +0000 (11:56 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 16 Oct 2015 15:56:02 +0000 (11:56 -0400)
In the previous coding, before returning from ExecutorRun, we'd shut
down all parallel workers.  This was dead wrong if ExecutorRun was
called with a non-zero tuple count; it had the effect of truncating
the query output.  To fix, give ExecutePlan control over whether to
enter parallel mode, and have it refuse to do so if the tuple count
is non-zero.  Rewrite the Gather logic so that it can cope with being
called outside parallel mode.

Commit 7aea8e4f2daa4b39ca9d1309a0c4aadb0f7ed81b is largely to blame
for this problem, though this patch modifies some subsequently-committed
code which relied on the guarantees it purported to make.

src/backend/executor/execMain.c
src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/include/executor/execParallel.h
src/include/nodes/execnodes.h

index 37b7bbd413b3c629d95d2e27f0e9ea01adfebf04..a55022e0a8029b36537c378e7a6f00a3aa6ac503 100644 (file)
@@ -76,6 +76,7 @@ static void CheckValidRowMarkRel(Relation rel, RowMarkType markType);
 static void ExecPostprocessPlan(EState *estate);
 static void ExecEndPlan(PlanState *planstate, EState *estate);
 static void ExecutePlan(EState *estate, PlanState *planstate,
+                       bool use_parallel_mode,
                        CmdType operation,
                        bool sendTuples,
                        long numberTuples,
@@ -243,11 +244,6 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
        if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY)))
                AfterTriggerBeginQuery();
 
-       /* Enter parallel mode, if required by the query. */
-       if (queryDesc->plannedstmt->parallelModeNeeded &&
-               !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
-               EnterParallelMode();
-
        MemoryContextSwitchTo(oldcontext);
 }
 
@@ -341,15 +337,13 @@ standard_ExecutorRun(QueryDesc *queryDesc,
        if (!ScanDirectionIsNoMovement(direction))
                ExecutePlan(estate,
                                        queryDesc->planstate,
+                                       queryDesc->plannedstmt->parallelModeNeeded,
                                        operation,
                                        sendTuples,
                                        count,
                                        direction,
                                        dest);
 
-       /* Allow nodes to release or shut down resources. */
-       (void) ExecShutdownNode(queryDesc->planstate);
-
        /*
         * shutdown tuple receiver, if we started it
         */
@@ -482,11 +476,6 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
         */
        MemoryContextSwitchTo(oldcontext);
 
-       /* Exit parallel mode, if it was required by the query. */
-       if (queryDesc->plannedstmt->parallelModeNeeded &&
-               !(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY))
-               ExitParallelMode();
-
        /*
         * Release EState and per-query memory context.  This should release
         * everything the executor has allocated.
@@ -1529,6 +1518,7 @@ ExecEndPlan(PlanState *planstate, EState *estate)
 static void
 ExecutePlan(EState *estate,
                        PlanState *planstate,
+                       bool use_parallel_mode,
                        CmdType operation,
                        bool sendTuples,
                        long numberTuples,
@@ -1548,6 +1538,20 @@ ExecutePlan(EState *estate,
         */
        estate->es_direction = direction;
 
+       /*
+        * If a tuple count was supplied, we must force the plan to run without
+        * parallelism, because we might exit early.
+        */
+       if (numberTuples != 0)
+               use_parallel_mode = false;
+
+       /*
+        * If a tuple count was supplied, we must force the plan to run without
+        * parallelism, because we might exit early.
+        */
+       if (use_parallel_mode)
+               EnterParallelMode();
+
        /*
         * Loop until we've processed the proper number of tuples from the plan.
         */
@@ -1566,7 +1570,11 @@ ExecutePlan(EState *estate,
                 * process so we just end the loop...
                 */
                if (TupIsNull(slot))
+               {
+                       /* Allow nodes to release or shut down resources. */
+                       (void) ExecShutdownNode(planstate);
                        break;
+               }
 
                /*
                 * If we have a junk filter, then project a new tuple with the junk
@@ -1603,6 +1611,9 @@ ExecutePlan(EState *estate,
                if (numberTuples && numberTuples == current_tuple_count)
                        break;
        }
+
+       if (use_parallel_mode)
+               ExitParallelMode();
 }
 
 
index e6930c1d51c929f4c1306ea38b749a757543929e..3bb820692d26702703013ea107bdfcb1b3852166 100644 (file)
@@ -442,6 +442,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
                                                                                        pei->instrumentation);
 }
 
+/*
+ * Clean up whatever ParallelExecutreInfo resources still exist after
+ * ExecParallelFinish.  We separate these routines because someone might
+ * want to examine the contents of the DSM after ExecParallelFinish and
+ * before calling this routine.
+ */
+void
+ExecParallelCleanup(ParallelExecutorInfo *pei)
+{
+       if (pei->pcxt != NULL)
+       {
+               DestroyParallelContext(pei->pcxt);
+               pei->pcxt = NULL;
+       }
+       pfree(pei);
+}
+
 /*
  * Create a DestReceiver to write tuples we produce to the shm_mq designated
  * for that purpose.
index c689a4d17a0316b9498338f04b6df1185129ffa1..7e2272f634ba460782adc30f78ea52cf1011dac2 100644 (file)
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/relscan.h"
+#include "access/xact.h"
 #include "executor/execdebug.h"
 #include "executor/execParallel.h"
 #include "executor/nodeGather.h"
@@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
        gatherstate = makeNode(GatherState);
        gatherstate->ps.plan = (Plan *) node;
        gatherstate->ps.state = estate;
-       gatherstate->need_to_scan_workers = false;
        gatherstate->need_to_scan_locally = !node->single_copy;
 
        /*
@@ -106,52 +106,57 @@ ExecGather(GatherState *node)
         * needs to allocate large dynamic segement, so it is better to do if it
         * is really needed.
         */
-       if (!node->pei)
+       if (!node->initialized)
        {
                EState     *estate = node->ps.state;
-
-               /* Initialize the workers required to execute Gather node. */
-               node->pei = ExecInitParallelPlan(node->ps.lefttree,
-                                                                                estate,
-                                                                 ((Gather *) (node->ps.plan))->num_workers);
+               Gather     *gather = (Gather *) node->ps.plan;
 
                /*
-                * Register backend workers. If the required number of workers are not
-                * available then we perform the scan with available workers and if
-                * there are no more workers available, then the Gather node will just
-                * scan locally.
+                * Sometimes we might have to run without parallelism; but if
+                * parallel mode is active then we can try to fire up some workers.
                 */
-               LaunchParallelWorkers(node->pei->pcxt);
-
-               node->funnel = CreateTupleQueueFunnel();
-
-               for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+               if (gather->num_workers > 0 && IsInParallelMode())
                {
-                       if (node->pei->pcxt->worker[i].bgwhandle)
+                       bool    got_any_worker = false;
+
+                       /* Initialize the workers required to execute Gather node. */
+                       node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                                                                        estate,
+                                                                                        gather->num_workers);
+
+                       /*
+                        * Register backend workers. We might not get as many as we
+                        * requested, or indeed any at all.
+                        */
+                       LaunchParallelWorkers(node->pei->pcxt);
+
+                       /* Set up a tuple queue to collect the results. */
+                       node->funnel = CreateTupleQueueFunnel();
+                       for (i = 0; i < node->pei->pcxt->nworkers; ++i)
                        {
-                               shm_mq_set_handle(node->pei->tqueue[i],
-                                                                 node->pei->pcxt->worker[i].bgwhandle);
-                               RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
-                               node->need_to_scan_workers = true;
+                               if (node->pei->pcxt->worker[i].bgwhandle)
+                               {
+                                       shm_mq_set_handle(node->pei->tqueue[i],
+                                                                         node->pei->pcxt->worker[i].bgwhandle);
+                                       RegisterTupleQueueOnFunnel(node->funnel,
+                                                                                          node->pei->tqueue[i]);
+                                       got_any_worker = true;
+                               }
                        }
+
+                       /* No workers?  Then never mind. */
+                       if (!got_any_worker)
+                               ExecShutdownGather(node);
                }
 
-               /* If no workers are available, we must always scan locally. */
-               if (!node->need_to_scan_workers)
-                       node->need_to_scan_locally = true;
+               /* Run plan locally if no workers or not single-copy. */
+               node->need_to_scan_locally = (node->funnel == NULL)
+                       || !gather->single_copy;
+               node->initialized = true;
        }
 
        slot = gather_getnext(node);
 
-       if (TupIsNull(slot))
-       {
-               /*
-                * Destroy the parallel context once we complete fetching all the
-                * tuples.  Otherwise, the DSM and workers will stick around for the
-                * lifetime of the entire statement.
-                */
-               ExecShutdownGather(node);
-       }
        return slot;
 }
 
@@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate)
         */
        slot = gatherstate->ps.ps_ProjInfo->pi_slot;
 
-       while (gatherstate->need_to_scan_workers ||
-                  gatherstate->need_to_scan_locally)
+       while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
        {
-               if (gatherstate->need_to_scan_workers)
+               if (gatherstate->funnel != NULL)
                {
                        bool            done = false;
 
@@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate)
                                                                           gatherstate->need_to_scan_locally,
                                                                           &done);
                        if (done)
-                               gatherstate->need_to_scan_workers = false;
+                               ExecShutdownGather(gatherstate);
 
                        if (HeapTupleIsValid(tup))
                        {
@@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate)
 void
 ExecShutdownGather(GatherState *node)
 {
-       Gather *gather;
-
-       if (node->pei == NULL || node->pei->pcxt == NULL)
-               return;
-
-       /*
-        * Ensure all workers have finished before destroying the parallel context
-        * to ensure a clean exit.
-        */
-       if (node->funnel)
+       /* Shut down tuple queue funnel before shutting down workers. */
+       if (node->funnel != NULL)
        {
                DestroyTupleQueueFunnel(node->funnel);
                node->funnel = NULL;
        }
 
-       ExecParallelFinish(node->pei);
-
-       /* destroy parallel context. */
-       DestroyParallelContext(node->pei->pcxt);
-       node->pei->pcxt = NULL;
-
-       gather = (Gather *) node->ps.plan;
-       node->need_to_scan_locally = !gather->single_copy;
-       node->need_to_scan_workers = false;
+       /* Now shut down the workers. */
+       if (node->pei != NULL)
+       {
+               ExecParallelFinish(node->pei);
+               ExecParallelCleanup(node->pei);
+               node->pei = NULL;
+       }
 }
 
 /* ----------------------------------------------------------------
@@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node)
         */
        ExecShutdownGather(node);
 
+       node->initialized = false;
+
        ExecReScan(node->ps.lefttree);
 }
index 4fc797ad9820600672632aa9d2575a70b930b94c..505500e76b54898bbe12105a51da6a8ef91a7154 100644 (file)
@@ -32,5 +32,6 @@ typedef struct ParallelExecutorInfo
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                                         EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
+extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 
 #endif   /* EXECPARALLEL_H */
index 23670e1ff9bac8689e40ef8bd75e38d15efcf530..4fcdcc4067a09d7afade70b2e7299ee3fbe7ac4e 100644 (file)
@@ -1961,9 +1961,9 @@ typedef struct UniqueState
 typedef struct GatherState
 {
        PlanState       ps;                             /* its first field is NodeTag */
+       bool            initialized;
        struct ParallelExecutorInfo *pei;
        struct TupleQueueFunnel *funnel;
-       bool            need_to_scan_workers;
        bool            need_to_scan_locally;
 } GatherState;