]> granicus.if.org Git - postgresql/commitdiff
Push tuple limits through Gather and Gather Merge.
authorRobert Haas <rhaas@postgresql.org>
Tue, 29 Aug 2017 17:12:23 +0000 (13:12 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 29 Aug 2017 17:16:55 +0000 (13:16 -0400)
If we only need, say, 10 tuples in total, then we certainly don't need
more than 10 tuples from any single process.  Pushing down the limit
lets workers exit early when possible.  For Gather Merge, there is
an additional benefit: a Sort immediately below the Gather Merge can
be done as a bounded sort if there is an applicable limit.

Robert Haas and Tom Lane

Discussion: http://postgr.es/m/CA+TgmoYa3QKKrLj5rX7UvGqhH73G1Li4B-EKxrmASaca2tFu9Q@mail.gmail.com

src/backend/executor/execParallel.c
src/backend/executor/execProcnode.c
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/backend/executor/nodeLimit.c
src/include/executor/execParallel.h
src/include/executor/executor.h
src/include/nodes/execnodes.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index ce47f1d4a8b7eacd1fc9091582501f85bfaede3b..ad9eba63dd3169d872482de737d01c9900c83d52 100644 (file)
  * greater than any 32-bit integer here so that values < 2^32 can be used
  * by individual parallel nodes to store their own state.
  */
-#define PARALLEL_KEY_PLANNEDSTMT               UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS                            UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE              UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE               UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA                               UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT                UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED            UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT               UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS                            UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE              UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE               UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA                               UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT                UINT64CONST(0xE000000000000008)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE              65536
 
+/*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+       int64           tuples_needed;  /* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
 /*
  * DSM structure for accumulating per-PlanState instrumentation.
  *
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
  * execution and return results to the main backend.
  */
 ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+                                        int64 tuples_needed)
 {
        ParallelExecutorInfo *pei;
        ParallelContext *pcxt;
        ExecParallelEstimateContext e;
        ExecParallelInitializeDSMContext d;
+       FixedParallelExecutorState *fpes;
        char       *pstmt_data;
        char       *pstmt_space;
        char       *param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
         * for the various things we need to store.
         */
 
+       /* Estimate space for fixed-size state. */
+       shm_toc_estimate_chunk(&pcxt->estimator,
+                                                  sizeof(FixedParallelExecutorState));
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+
        /* Estimate space for query text. */
        query_len = strlen(estate->es_sourceText);
        shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
         * asked for has been allocated or initialized yet, though, so do that.
         */
 
+       /* Store fixed-size state. */
+       fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+       fpes->tuples_needed = tuples_needed;
+       shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
        /* Store query string */
        query_string = shm_toc_allocate(pcxt->toc, query_len);
        memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
 void
 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
+       FixedParallelExecutorState *fpes;
        BufferUsage *buffer_usage;
        DestReceiver *receiver;
        QueryDesc  *queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        void       *area_space;
        dsa_area   *area;
 
+       /* Get fixed-size state. */
+       fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
        /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
        receiver = ExecParallelGetReceiver(seg, toc);
        instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        queryDesc->planstate->state->es_query_dsa = area;
        ExecParallelInitializeWorker(queryDesc->planstate, toc);
 
-       /* Run the plan */
-       ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
+       /* Pass down any tuple bound */
+       ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
+       /*
+        * Run the plan.  If we specified a tuple bound, be careful not to demand
+        * more tuples than that.
+        */
+       ExecutorRun(queryDesc,
+                               ForwardScanDirection,
+                               fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
+                               true);
 
        /* Shut down the executor */
        ExecutorFinish(queryDesc);
index 36d2914249c42f9905a4aad3caa70e00cce63988..c1aa5064c909776780e33ad72fbfb9556dec5392 100644 (file)
@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node)
 
        return false;
 }
+
+/*
+ * ExecSetTupleBound
+ *
+ * Set a tuple bound for a planstate node.  This lets child plan nodes
+ * optimize based on the knowledge that the maximum number of tuples that
+ * their parent will demand is limited.  The tuple bound for a node may
+ * only be changed between scans (i.e., after node initialization or just
+ * before an ExecReScan call).
+ *
+ * Any negative tuples_needed value means "no limit", which should be the
+ * default assumption when this is not called at all for a particular node.
+ *
+ * Note: if this is called repeatedly on a plan tree, the exact same set
+ * of nodes must be updated with the new limit each time; be careful that
+ * only unchanging conditions are tested here.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+       /*
+        * Since this function recurses, in principle we should check stack depth
+        * here.  In practice, it's probably pointless since the earlier node
+        * initialization tree traversal would surely have consumed more stack.
+        */
+
+       if (IsA(child_node, SortState))
+       {
+               /*
+                * If it is a Sort node, notify it that it can use bounded sort.
+                *
+                * Note: it is the responsibility of nodeSort.c to react properly to
+                * changes of these parameters.  If we ever redesign this, it'd be a
+                * good idea to integrate this signaling with the parameter-change
+                * mechanism.
+                */
+               SortState  *sortState = (SortState *) child_node;
+
+               if (tuples_needed < 0)
+               {
+                       /* make sure flag gets reset if needed upon rescan */
+                       sortState->bounded = false;
+               }
+               else
+               {
+                       sortState->bounded = true;
+                       sortState->bound = tuples_needed;
+               }
+       }
+       else if (IsA(child_node, MergeAppendState))
+       {
+               /*
+                * If it is a MergeAppend, we can apply the bound to any nodes that
+                * are children of the MergeAppend, since the MergeAppend surely need
+                * read no more than that many tuples from any one input.
+                */
+               MergeAppendState *maState = (MergeAppendState *) child_node;
+               int                     i;
+
+               for (i = 0; i < maState->ms_nplans; i++)
+                       ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+       }
+       else if (IsA(child_node, ResultState))
+       {
+               /*
+                * Similarly, for a projecting Result, we can apply the bound to its
+                * child node.
+                *
+                * If Result supported qual checking, we'd have to punt on seeing a
+                * qual.  Note that having a resconstantqual is not a showstopper: if
+                * that condition succeeds it affects nothing, while if it fails, no
+                * rows will be demanded from the Result child anyway.
+                */
+               if (outerPlanState(child_node))
+                       ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+       }
+       else if (IsA(child_node, SubqueryScanState))
+       {
+               /*
+                * We can also descend through SubqueryScan, but only if it has no
+                * qual (otherwise it might discard rows).
+                */
+               SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
+
+               if (subqueryState->ss.ps.qual == NULL)
+                       ExecSetTupleBound(tuples_needed, subqueryState->subplan);
+       }
+       else if (IsA(child_node, GatherState))
+       {
+               /*
+                * A Gather node can propagate the bound to its workers.  As with
+                * MergeAppend, no one worker could possibly need to return more
+                * tuples than the Gather itself needs to.
+                *
+                * Note: As with Sort, the Gather node is responsible for reacting
+                * properly to changes to this parameter.
+                */
+               GatherState *gstate = (GatherState *) child_node;
+
+               gstate->tuples_needed = tuples_needed;
+
+               /* Also pass down the bound to our own copy of the child plan */
+               ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+       }
+       else if (IsA(child_node, GatherMergeState))
+       {
+               /* Same comments as for Gather */
+               GatherMergeState *gstate = (GatherMergeState *) child_node;
+
+               gstate->tuples_needed = tuples_needed;
+
+               ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+       }
+
+       /*
+        * In principle we could descend through any plan node type that is
+        * certain not to discard or combine input rows; but on seeing a node that
+        * can do that, we can't propagate the bound any further.  For the moment
+        * it's unclear that any other cases are worth checking here.
+        */
+}
index e8d94ee6f38d062522706aa6832c9d4a89241745..a0f5a60d9322a1a4004b27130fd975bc01bd6f62 100644 (file)
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
        gatherstate->ps.state = estate;
        gatherstate->ps.ExecProcNode = ExecGather;
        gatherstate->need_to_scan_locally = !node->single_copy;
+       gatherstate->tuples_needed = -1;
 
        /*
         * Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
                        if (!node->pei)
                                node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                                                                 estate,
-                                                                                                gather->num_workers);
+                                                                                                gather->num_workers,
+                                                                                                node->tuples_needed);
 
                        /*
                         * Register backend workers. We might not get as many as we
index 64c62398bbebbe86398484187c28e9395aa9ccba..2526c584fd0d4230b613a107700522cf7b229361 100644 (file)
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
        gm_state->ps.plan = (Plan *) node;
        gm_state->ps.state = estate;
        gm_state->ps.ExecProcNode = ExecGatherMerge;
+       gm_state->tuples_needed = -1;
 
        /*
         * Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
                        if (!node->pei)
                                node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                                                                 estate,
-                                                                                                gm->num_workers);
+                                                                                                gm->num_workers,
+                                                                                                node->tuples_needed);
 
                        /* Try to launch workers. */
                        pcxt = node->pei->pcxt;
index ceb6854b5979deb8207f2b51c8fac6e6acd1c161..883f46ce7c997ad40b555c071dce52ea9eb2919c 100644 (file)
@@ -27,7 +27,7 @@
 #include "nodes/nodeFuncs.h"
 
 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);
 
 
 /* ----------------------------------------------------------------
@@ -297,92 +297,26 @@ recompute_limits(LimitState *node)
        /* Set state-machine state */
        node->lstate = LIMIT_RESCAN;
 
-       /* Notify child node about limit, if useful */
-       pass_down_bound(node, outerPlanState(node));
+       /*
+        * Notify child node about limit.  Note: think not to "optimize" by
+        * skipping ExecSetTupleBound if compute_tuples_needed returns < 0.  We
+        * must update the child node anyway, in case this is a rescan and the
+        * previous time we got a different result.
+        */
+       ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
 }
 
 /*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  We can also pass down the bound through plan nodes
- * that cannot remove or combine input rows; for example, if our input is a
- * MergeAppend, we can apply the same bound to any Sorts that are direct
- * children of the MergeAppend, since the MergeAppend surely need not read
- * more than that many tuples from any one input.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the maximum number of tuples needed to satisfy this Limit node.
+ * Return a negative value if there is not a determinable limit.
  */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
 {
-       /*
-        * Since this function recurses, in principle we should check stack depth
-        * here.  In practice, it's probably pointless since the earlier node
-        * initialization tree traversal would surely have consumed more stack.
-        */
-
-       if (IsA(child_node, SortState))
-       {
-               SortState  *sortState = (SortState *) child_node;
-               int64           tuples_needed = node->count + node->offset;
-
-               /* negative test checks for overflow in sum */
-               if (node->noCount || tuples_needed < 0)
-               {
-                       /* make sure flag gets reset if needed upon rescan */
-                       sortState->bounded = false;
-               }
-               else
-               {
-                       sortState->bounded = true;
-                       sortState->bound = tuples_needed;
-               }
-       }
-       else if (IsA(child_node, MergeAppendState))
-       {
-               /* Pass down the bound through MergeAppend */
-               MergeAppendState *maState = (MergeAppendState *) child_node;
-               int                     i;
-
-               for (i = 0; i < maState->ms_nplans; i++)
-                       pass_down_bound(node, maState->mergeplans[i]);
-       }
-       else if (IsA(child_node, ResultState))
-       {
-               /*
-                * We also have to be prepared to look through a Result, since the
-                * planner might stick one atop MergeAppend for projection purposes.
-                *
-                * If Result supported qual checking, we'd have to punt on seeing a
-                * qual.  Note that having a resconstantqual is not a showstopper: if
-                * that fails we're not getting any rows at all.
-                */
-               if (outerPlanState(child_node))
-                       pass_down_bound(node, outerPlanState(child_node));
-       }
-       else if (IsA(child_node, SubqueryScanState))
-       {
-               /*
-                * We can also look through SubqueryScan, but only if it has no qual
-                * (otherwise it might discard rows).
-                */
-               SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
-
-               if (subqueryState->ss.ps.qual == NULL)
-                       pass_down_bound(node, subqueryState->subplan);
-       }
-
-       /*
-        * In principle we could look through any plan node type that is certain
-        * not to discard or combine input rows.  In practice, there are not many
-        * node types that the planner might put between Sort and Limit, so trying
-        * to be very general is not worth the trouble.
-        */
+       if (node->noCount)
+               return -1;
+       /* Note: if this overflows, we'll return a negative value, which is OK */
+       return node->count + node->offset;
 }
 
 /* ----------------------------------------------------------------
index bd0a87fa0416a7ef2224ebd7f3e7b14593fdf098..79b886706f771d7c4d4c7a523056edf0e65c964d 100644 (file)
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
-                                        EState *estate, int nworkers);
+                                        EState *estate, int nworkers, int64 tuples_needed);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
index eacbea3c3655aec2314954648515f17235629777..f48a603daebe34bb1befa3b823c91df8e2bd57e5 100644 (file)
@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
 /* ----------------------------------------------------------------
index 3272c4b315575f6f17cbc67afe1f68decb6f4338..15a84269ec96fdda649d2ddeb0c3de743ed6d27f 100644 (file)
@@ -1919,6 +1919,7 @@ typedef struct GatherState
        struct TupleQueueReader **reader;
        TupleTableSlot *funnel_slot;
        bool            need_to_scan_locally;
+       int64           tuples_needed;  /* tuple bound, see ExecSetTupleBound */
 } GatherState;
 
 /* ----------------
@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
        struct binaryheap *gm_heap; /* binary heap of slot indices */
        bool            gm_initialized; /* gather merge initilized ? */
        bool            need_to_scan_locally;
+       int64           tuples_needed;  /* tuple bound, see ExecSetTupleBound */
        int                     gm_nkeys;
        SortSupport gm_sortkeys;        /* array of length ms_nkeys */
        struct GMReaderTupleBuffer *gm_tuple_buffers;   /* tuple buffer per reader */
index 084f0f0c8e11ee39f628a3a00613817eec5055bf..ccad18e978ff9866e1afb068736c21bdfdd926dd 100644 (file)
@@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty;
    500
 (20 rows)
 
+reset enable_hashagg;
+-- gather merge test with a LIMIT
+explain (costs off)
+  select fivethous from tenk1 order by fivethous limit 4;
+                  QUERY PLAN                  
+----------------------------------------------
+ Limit
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: fivethous
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+select fivethous from tenk1 order by fivethous limit 4;
+ fivethous 
+-----------
+         0
+         0
+         1
+         1
+(4 rows)
+
 -- gather merge test with 0 worker
 set max_parallel_workers = 0;
 explain (costs off)
@@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5;
 (5 rows)
 
 reset max_parallel_workers;
-reset enable_hashagg;
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
 explain (costs off)
index 58c3f598905cc739290e0cb92727b6f309cd8bb4..c0debddbcd1d41c12500894ca60cd0908dcdd9d7 100644 (file)
@@ -118,13 +118,20 @@ explain (costs off)
 
 select count(*) from tenk1 group by twenty;
 
+reset enable_hashagg;
+
+-- gather merge test with a LIMIT
+explain (costs off)
+  select fivethous from tenk1 order by fivethous limit 4;
+
+select fivethous from tenk1 order by fivethous limit 4;
+
 -- gather merge test with 0 worker
 set max_parallel_workers = 0;
 explain (costs off)
    select string4 from tenk1 order by string4 limit 5;
 select string4 from tenk1 order by string4 limit 5;
 reset max_parallel_workers;
-reset enable_hashagg;
 
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;