]> granicus.if.org Git - postgresql/commitdiff
Support Parallel Append plan nodes.
authorRobert Haas <rhaas@postgresql.org>
Tue, 5 Dec 2017 22:28:39 +0000 (17:28 -0500)
committerRobert Haas <rhaas@postgresql.org>
Tue, 5 Dec 2017 22:28:39 +0000 (17:28 -0500)
When we create an Append node, we can spread out the workers over the
subplans instead of piling on to each subplan one at a time, which
should typically be a bit more efficient, both because the startup
cost of any plan executed entirely by one worker is paid only once and
also because of reduced contention.  We can also construct Append
plans using a mix of partial and non-partial subplans, which may allow
for parallelism in places that otherwise couldn't support it.
Unfortunately, this patch doesn't handle the important case of
parallelizing UNION ALL by running each branch in a separate worker;
the executor infrastructure is added here, but more planner work is
needed.

Amit Khandekar, Robert Haas, Amul Sul, reviewed and tested by
Ashutosh Bapat, Amit Langote, Rafia Sabih, Amit Kapila, and
Rajkumar Raghuwanshi.

Discussion: http://postgr.es/m/CAJ3gD9dy0K_E8r727heqXoBmWZ83HwLFwdcaSSmBQ1+S+vRuUQ@mail.gmail.com

31 files changed:
doc/src/sgml/config.sgml
doc/src/sgml/monitoring.sgml
src/backend/executor/execParallel.c
src/backend/executor/nodeAppend.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/list.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/path/joinrels.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/prep/prepunion.c
src/backend/optimizer/util/pathnode.c
src/backend/storage/lmgr/lwlock.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/executor/nodeAppend.h
src/include/nodes/execnodes.h
src/include/nodes/pg_list.h
src/include/nodes/plannodes.h
src/include/nodes/relation.h
src/include/optimizer/cost.h
src/include/optimizer/pathnode.h
src/include/storage/lwlock.h
src/test/regress/expected/inherit.out
src/test/regress/expected/select_parallel.out
src/test/regress/expected/sysviews.out
src/test/regress/sql/inherit.sql
src/test/regress/sql/select_parallel.sql

index 563ad1fc7f8a1bee1f6059f64fd06f2d5e84b52b..9ae6861cd7aae40b7647b7bc6864f2517c8ecc42 100644 (file)
@@ -3633,6 +3633,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-parallel-append" xreflabel="enable_parallel_append">
+      <term><varname>enable_parallel_append</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_parallel_append</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of parallel-aware
+        append plan types. The default is <literal>on</>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-partition-wise-join" xreflabel="enable_partition_wise_join">
       <term><varname>enable_partition_wise_join</varname> (<type>boolean</type>)
       <indexterm>
index 8d461c81459d207d75f095389bdbfb6e6f628f7e..b6f80d97080d565bde2e8c3c8c37c944f8b57c03 100644 (file)
@@ -845,7 +845,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
       <tbody>
        <row>
-        <entry morerows="62"><literal>LWLock</literal></entry>
+        <entry morerows="63"><literal>LWLock</literal></entry>
         <entry><literal>ShmemIndexLock</literal></entry>
         <entry>Waiting to find or allocate space in shared memory.</entry>
        </row>
@@ -1116,6 +1116,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>tbm</literal></entry>
          <entry>Waiting for TBM shared iterator lock.</entry>
         </row>
+        <row>
+         <entry><literal>parallel_append</literal></entry>
+         <entry>Waiting to choose the next subplan during Parallel Append plan
+         execution.</entry>
+        </row>
         <row>
          <entry morerows="9"><literal>Lock</literal></entry>
          <entry><literal>relation</literal></entry>
index ff5cff59b0f02b4e375c8b8c564e9411f6961ac9..558cb08b07e95d05b996ba3ac9f3018cde697405 100644 (file)
@@ -26,6 +26,7 @@
 #include "executor/execExpr.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeAppend.h"
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
@@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
                                ExecForeignScanEstimate((ForeignScanState *) planstate,
                                                                                e->pcxt);
                        break;
+               case T_AppendState:
+                       if (planstate->plan->parallel_aware)
+                               ExecAppendEstimate((AppendState *) planstate,
+                                                                  e->pcxt);
+                       break;
                case T_CustomScanState:
                        if (planstate->plan->parallel_aware)
                                ExecCustomScanEstimate((CustomScanState *) planstate,
@@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
                                ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
                                                                                         d->pcxt);
                        break;
+               case T_AppendState:
+                       if (planstate->plan->parallel_aware)
+                               ExecAppendInitializeDSM((AppendState *) planstate,
+                                                                               d->pcxt);
+                       break;
                case T_CustomScanState:
                        if (planstate->plan->parallel_aware)
                                ExecCustomScanInitializeDSM((CustomScanState *) planstate,
@@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
                                ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
                                                                                           pcxt);
                        break;
+               case T_AppendState:
+                       if (planstate->plan->parallel_aware)
+                               ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
+                       break;
                case T_CustomScanState:
                        if (planstate->plan->parallel_aware)
                                ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
@@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
                                ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
                                                                                                pwcxt);
                        break;
+               case T_AppendState:
+                       if (planstate->plan->parallel_aware)
+                               ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
+                       break;
                case T_CustomScanState:
                        if (planstate->plan->parallel_aware)
                                ExecCustomScanInitializeWorker((CustomScanState *) planstate,
index 1d2fb35d551eb952a0647b6d26df5c344a4d65c0..246a0b2d852a0e6331791168bf35ff531eccf7f3 100644 (file)
 #include "executor/nodeAppend.h"
 #include "miscadmin.h"
 
-static TupleTableSlot *ExecAppend(PlanState *pstate);
-static bool exec_append_initialize_next(AppendState *appendstate);
-
-
-/* ----------------------------------------------------------------
- *             exec_append_initialize_next
- *
- *             Sets up the append state node for the "next" scan.
- *
- *             Returns t iff there is a "next" scan to process.
- * ----------------------------------------------------------------
- */
-static bool
-exec_append_initialize_next(AppendState *appendstate)
+/* Shared state for parallel-aware Append. */
+struct ParallelAppendState
 {
-       int                     whichplan;
+       LWLock          pa_lock;                /* mutual exclusion to choose next subplan */
+       int                     pa_next_plan;   /* next plan to choose by any worker */
 
        /*
-        * get information from the append node
+        * pa_finished[i] should be true if no more workers should select subplan
+        * i.  for a non-partial plan, this should be set to true as soon as a
+        * worker selects the plan; for a partial plan, it remains false until
+        * some worker executes the plan to completion.
         */
-       whichplan = appendstate->as_whichplan;
+       bool            pa_finished[FLEXIBLE_ARRAY_MEMBER];
+};
 
-       if (whichplan < 0)
-       {
-               /*
-                * if scanning in reverse, we start at the last scan in the list and
-                * then proceed back to the first.. in any case we inform ExecAppend
-                * that we are at the end of the line by returning false
-                */
-               appendstate->as_whichplan = 0;
-               return false;
-       }
-       else if (whichplan >= appendstate->as_nplans)
-       {
-               /*
-                * as above, end the scan if we go beyond the last scan in our list..
-                */
-               appendstate->as_whichplan = appendstate->as_nplans - 1;
-               return false;
-       }
-       else
-       {
-               return true;
-       }
-}
+#define INVALID_SUBPLAN_INDEX          -1
+
+static TupleTableSlot *ExecAppend(PlanState *pstate);
+static bool choose_next_subplan_locally(AppendState *node);
+static bool choose_next_subplan_for_leader(AppendState *node);
+static bool choose_next_subplan_for_worker(AppendState *node);
 
 /* ----------------------------------------------------------------
  *             ExecInitAppend
@@ -185,10 +161,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
        appendstate->ps.ps_ProjInfo = NULL;
 
        /*
-        * initialize to scan first subplan
+        * Parallel-aware append plans must choose the first subplan to execute by
+        * looking at shared memory, but non-parallel-aware append plans can
+        * always start with the first subplan.
         */
-       appendstate->as_whichplan = 0;
-       exec_append_initialize_next(appendstate);
+       appendstate->as_whichplan =
+               appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
+
+       /* If parallel-aware, this will be overridden later. */
+       appendstate->choose_next_subplan = choose_next_subplan_locally;
 
        return appendstate;
 }
@@ -204,6 +185,11 @@ ExecAppend(PlanState *pstate)
 {
        AppendState *node = castNode(AppendState, pstate);
 
+       /* If no subplan has been chosen, we must choose one before proceeding. */
+       if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+               !node->choose_next_subplan(node))
+               return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
        for (;;)
        {
                PlanState  *subnode;
@@ -214,6 +200,7 @@ ExecAppend(PlanState *pstate)
                /*
                 * figure out which subplan we are currently processing
                 */
+               Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
                subnode = node->appendplans[node->as_whichplan];
 
                /*
@@ -231,19 +218,9 @@ ExecAppend(PlanState *pstate)
                        return result;
                }
 
-               /*
-                * Go on to the "next" subplan in the appropriate direction. If no
-                * more subplans, return the empty slot set up for us by
-                * ExecInitAppend.
-                */
-               if (ScanDirectionIsForward(node->ps.state->es_direction))
-                       node->as_whichplan++;
-               else
-                       node->as_whichplan--;
-               if (!exec_append_initialize_next(node))
+               /* choose new subplan; if none, we're done */
+               if (!node->choose_next_subplan(node))
                        return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-               /* Else loop back and try to get a tuple from the new subplan */
        }
 }
 
@@ -298,6 +275,246 @@ ExecReScanAppend(AppendState *node)
                if (subnode->chgParam == NULL)
                        ExecReScan(subnode);
        }
-       node->as_whichplan = 0;
-       exec_append_initialize_next(node);
+
+       node->as_whichplan =
+               node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
+}
+
+/* ----------------------------------------------------------------
+ *                                             Parallel Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *             ExecAppendEstimate
+ *
+ *             Compute the amount of space we'll need in the parallel
+ *             query DSM, and inform pcxt->estimator about our needs.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendEstimate(AppendState *node,
+                                  ParallelContext *pcxt)
+{
+       node->pstate_len =
+               add_size(offsetof(ParallelAppendState, pa_finished),
+                                sizeof(bool) * node->as_nplans);
+
+       shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+
+/* ----------------------------------------------------------------
+ *             ExecAppendInitializeDSM
+ *
+ *             Set up shared state for Parallel Append.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeDSM(AppendState *node,
+                                               ParallelContext *pcxt)
+{
+       ParallelAppendState *pstate;
+
+       pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
+       memset(pstate, 0, node->pstate_len);
+       LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
+       shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
+
+       node->as_pstate = pstate;
+       node->choose_next_subplan = choose_next_subplan_for_leader;
+}
+
+/* ----------------------------------------------------------------
+ *             ExecAppendReInitializeDSM
+ *
+ *             Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
+{
+       ParallelAppendState *pstate = node->as_pstate;
+
+       pstate->pa_next_plan = 0;
+       memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecAppendInitializeWorker
+ *
+ *             Copy relevant information from TOC into planstate, and initialize
+ *             whatever is required to choose and execute the optimal subplan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
+{
+       node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
+       node->choose_next_subplan = choose_next_subplan_for_worker;
+}
+
+/* ----------------------------------------------------------------
+ *             choose_next_subplan_locally
+ *
+ *             Choose next subplan for a non-parallel-aware Append,
+ *             returning false if there are no more.
+ * ----------------------------------------------------------------
+ */
+static bool
+choose_next_subplan_locally(AppendState *node)
+{
+       int                     whichplan = node->as_whichplan;
+
+       /* We should never see INVALID_SUBPLAN_INDEX in this case. */
+       Assert(whichplan >= 0 && whichplan <= node->as_nplans);
+
+       if (ScanDirectionIsForward(node->ps.state->es_direction))
+       {
+               if (whichplan >= node->as_nplans - 1)
+                       return false;
+               node->as_whichplan++;
+       }
+       else
+       {
+               if (whichplan <= 0)
+                       return false;
+               node->as_whichplan--;
+       }
+
+       return true;
+}
+
+/* ----------------------------------------------------------------
+ *             choose_next_subplan_for_leader
+ *
+ *      Try to pick a plan which doesn't commit us to doing much
+ *      work locally, so that as much work as possible is done in
+ *      the workers.  Cheapest subplans are at the end.
+ * ----------------------------------------------------------------
+ */
+static bool
+choose_next_subplan_for_leader(AppendState *node)
+{
+       ParallelAppendState *pstate = node->as_pstate;
+       Append     *append = (Append *) node->ps.plan;
+
+       /* Backward scan is not supported by parallel-aware plans */
+       Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+       LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
+
+       if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+       {
+               /* Mark just-completed subplan as finished. */
+               node->as_pstate->pa_finished[node->as_whichplan] = true;
+       }
+       else
+       {
+               /* Start with last subplan. */
+               node->as_whichplan = node->as_nplans - 1;
+       }
+
+       /* Loop until we find a subplan to execute. */
+       while (pstate->pa_finished[node->as_whichplan])
+       {
+               if (node->as_whichplan == 0)
+               {
+                       pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
+                       node->as_whichplan = INVALID_SUBPLAN_INDEX;
+                       LWLockRelease(&pstate->pa_lock);
+                       return false;
+               }
+               node->as_whichplan--;
+       }
+
+       /* If non-partial, immediately mark as finished. */
+       if (node->as_whichplan < append->first_partial_plan)
+               node->as_pstate->pa_finished[node->as_whichplan] = true;
+
+       LWLockRelease(&pstate->pa_lock);
+
+       return true;
+}
+
+/* ----------------------------------------------------------------
+ *             choose_next_subplan_for_worker
+ *
+ *             Choose next subplan for a parallel-aware Append, returning
+ *             false if there are no more.
+ *
+ *             We start from the first plan and advance through the list;
+ *             when we get back to the end, we loop back to the first
+ *             nonpartial plan.  This assigns the non-partial plans first
+ *             in order of descending cost and then spreads out the
+ *             workers as evenly as possible across the remaining partial
+ *             plans.
+ * ----------------------------------------------------------------
+ */
+static bool
+choose_next_subplan_for_worker(AppendState *node)
+{
+       ParallelAppendState *pstate = node->as_pstate;
+       Append     *append = (Append *) node->ps.plan;
+
+       /* Backward scan is not supported by parallel-aware plans */
+       Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+       LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
+
+       /* Mark just-completed subplan as finished. */
+       if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+               node->as_pstate->pa_finished[node->as_whichplan] = true;
+
+       /* If all the plans are already done, we have nothing to do */
+       if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
+       {
+               LWLockRelease(&pstate->pa_lock);
+               return false;
+       }
+
+       /* Loop until we find a subplan to execute. */
+       while (pstate->pa_finished[pstate->pa_next_plan])
+       {
+               if (pstate->pa_next_plan < node->as_nplans - 1)
+               {
+                       /* Advance to next plan. */
+                       pstate->pa_next_plan++;
+               }
+               else if (append->first_partial_plan < node->as_nplans)
+               {
+                       /* Loop back to first partial plan. */
+                       pstate->pa_next_plan = append->first_partial_plan;
+               }
+               else
+               {
+                       /* At last plan, no partial plans, arrange to bail out. */
+                       pstate->pa_next_plan = node->as_whichplan;
+               }
+
+               if (pstate->pa_next_plan == node->as_whichplan)
+               {
+                       /* We've tried everything! */
+                       pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
+                       LWLockRelease(&pstate->pa_lock);
+                       return false;
+               }
+       }
+
+       /* Pick the plan we found, and advance pa_next_plan one more time. */
+       node->as_whichplan = pstate->pa_next_plan++;
+       if (pstate->pa_next_plan >= node->as_nplans)
+       {
+               Assert(append->first_partial_plan < node->as_nplans);
+               pstate->pa_next_plan = append->first_partial_plan;
+       }
+
+       /* If non-partial, immediately mark as finished. */
+       if (node->as_whichplan < append->first_partial_plan)
+               node->as_pstate->pa_finished[node->as_whichplan] = true;
+
+       LWLockRelease(&pstate->pa_lock);
+
+       return true;
 }
index aff9a62106d64bd52b68e98487fd968d40aa3016..b1515dd8e1031bd7c885ff9745d8a0cab174709f 100644 (file)
@@ -242,6 +242,7 @@ _copyAppend(const Append *from)
         */
        COPY_NODE_FIELD(partitioned_rels);
        COPY_NODE_FIELD(appendplans);
+       COPY_SCALAR_FIELD(first_partial_plan);
 
        return newnode;
 }
index acaf4b53153c18ff038504b1752f851f7f9e3f37..bee6244adc1e0cb22a47fe0d9137af90f613adf0 100644 (file)
@@ -1249,6 +1249,44 @@ list_copy_tail(const List *oldlist, int nskip)
        return newlist;
 }
 
+/*
+ * Sort a list using qsort. A sorted list is built but the cells of the
+ * original list are re-used.  The comparator function receives arguments of
+ * type ListCell **
+ */
+List *
+list_qsort(const List *list, list_qsort_comparator cmp)
+{
+       ListCell   *cell;
+       int                     i;
+       int                     len = list_length(list);
+       ListCell  **list_arr;
+       List       *new_list;
+
+       if (len == 0)
+               return NIL;
+
+       i = 0;
+       list_arr = palloc(sizeof(ListCell *) * len);
+       foreach(cell, list)
+               list_arr[i++] = cell;
+
+       qsort(list_arr, len, sizeof(ListCell *), cmp);
+
+       new_list = (List *) palloc(sizeof(List));
+       new_list->type = list->type;
+       new_list->length = len;
+       new_list->head = list_arr[0];
+       new_list->tail = list_arr[len - 1];
+
+       for (i = 0; i < len - 1; i++)
+               list_arr[i]->next = list_arr[i + 1];
+
+       list_arr[len - 1]->next = NULL;
+       pfree(list_arr);
+       return new_list;
+}
+
 /*
  * Temporary compatibility functions
  *
index c97ee24ade15904374bb5db4c152cb4a5a2e2674..b59a5219a724230536f959f5567d0c4f76ecb1bc 100644 (file)
@@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node)
 
        WRITE_NODE_FIELD(partitioned_rels);
        WRITE_NODE_FIELD(appendplans);
+       WRITE_INT_FIELD(first_partial_plan);
 }
 
 static void
index 7eb67fc040787e3709b384131ef68f2cf822b28d..0d17ae89b0c253f4886cc262e2e6b77f104643b8 100644 (file)
@@ -1600,6 +1600,7 @@ _readAppend(void)
 
        READ_NODE_FIELD(partitioned_rels);
        READ_NODE_FIELD(appendplans);
+       READ_INT_FIELD(first_partial_plan);
 
        READ_DONE();
 }
index 44f6b0344208a9509aab0b09a3bc2fd0cd3de73f..47986ba80a58ea464aafae6427baaf728aa72838 100644 (file)
@@ -101,7 +101,8 @@ static void generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel,
 static Path *get_cheapest_parameterized_child_path(PlannerInfo *root,
                                                                          RelOptInfo *rel,
                                                                          Relids required_outer);
-static List *accumulate_append_subpath(List *subpaths, Path *path);
+static void accumulate_append_subpath(Path *path,
+                                                 List **subpaths, List **special_subpaths);
 static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
                                          Index rti, RangeTblEntry *rte);
 static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel,
@@ -1331,13 +1332,17 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
        List       *subpaths = NIL;
        bool            subpaths_valid = true;
        List       *partial_subpaths = NIL;
+       List       *pa_partial_subpaths = NIL;
+       List       *pa_nonpartial_subpaths = NIL;
        bool            partial_subpaths_valid = true;
+       bool            pa_subpaths_valid = enable_parallel_append;
        List       *all_child_pathkeys = NIL;
        List       *all_child_outers = NIL;
        ListCell   *l;
        List       *partitioned_rels = NIL;
        RangeTblEntry *rte;
        bool            build_partitioned_rels = false;
+       double          partial_rows = -1;
 
        if (IS_SIMPLE_REL(rel))
        {
@@ -1388,6 +1393,7 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
        {
                RelOptInfo *childrel = lfirst(l);
                ListCell   *lcp;
+               Path       *cheapest_partial_path = NULL;
 
                /*
                 * If we need to build partitioned_rels, accumulate the partitioned
@@ -1408,18 +1414,69 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
                 * If not, there's no workable unparameterized path.
                 */
                if (childrel->cheapest_total_path->param_info == NULL)
-                       subpaths = accumulate_append_subpath(subpaths,
-                                                                                                childrel->cheapest_total_path);
+                       accumulate_append_subpath(childrel->cheapest_total_path,
+                                                                         &subpaths, NULL);
                else
                        subpaths_valid = false;
 
                /* Same idea, but for a partial plan. */
                if (childrel->partial_pathlist != NIL)
-                       partial_subpaths = accumulate_append_subpath(partial_subpaths,
-                                                                                                                linitial(childrel->partial_pathlist));
+               {
+                       cheapest_partial_path = linitial(childrel->partial_pathlist);
+                       accumulate_append_subpath(cheapest_partial_path,
+                                                                         &partial_subpaths, NULL);
+               }
                else
                        partial_subpaths_valid = false;
 
+               /*
+                * Same idea, but for a parallel append mixing partial and non-partial
+                * paths.
+                */
+               if (pa_subpaths_valid)
+               {
+                       Path       *nppath = NULL;
+
+                       nppath =
+                               get_cheapest_parallel_safe_total_inner(childrel->pathlist);
+
+                       if (cheapest_partial_path == NULL && nppath == NULL)
+                       {
+                               /* Neither a partial nor a parallel-safe path?  Forget it. */
+                               pa_subpaths_valid = false;
+                       }
+                       else if (nppath == NULL ||
+                                        (cheapest_partial_path != NULL &&
+                                         cheapest_partial_path->total_cost < nppath->total_cost))
+                       {
+                               /* Partial path is cheaper or the only option. */
+                               Assert(cheapest_partial_path != NULL);
+                               accumulate_append_subpath(cheapest_partial_path,
+                                                                                 &pa_partial_subpaths,
+                                                                                 &pa_nonpartial_subpaths);
+
+                       }
+                       else
+                       {
+                               /*
+                                * Either we've got only a non-partial path, or we think that
+                                * a single backend can execute the best non-partial path
+                                * faster than all the parallel backends working together can
+                                * execute the best partial path.
+                                *
+                                * It might make sense to be more aggressive here.  Even if
+                                * the best non-partial path is more expensive than the best
+                                * partial path, it could still be better to choose the
+                                * non-partial path if there are several such paths that can
+                                * be given to different workers.  For now, we don't try to
+                                * figure that out.
+                                */
+                               accumulate_append_subpath(nppath,
+                                                                                 &pa_nonpartial_subpaths,
+                                                                                 NULL);
+                       }
+               }
+
                /*
                 * Collect lists of all the available path orderings and
                 * parameterizations for all the children.  We use these as a
@@ -1491,11 +1548,13 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
         * if we have zero or one live subpath due to constraint exclusion.)
         */
        if (subpaths_valid)
-               add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0,
-                                                                                                 partitioned_rels));
+               add_path(rel, (Path *) create_append_path(rel, subpaths, NIL,
+                                                                                                 NULL, 0, false,
+                                                                                                 partitioned_rels, -1));
 
        /*
-        * Consider an append of partial unordered, unparameterized partial paths.
+        * Consider an append of unordered, unparameterized partial paths.  Make
+        * it parallel-aware if possible.
         */
        if (partial_subpaths_valid)
        {
@@ -1503,12 +1562,7 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
                ListCell   *lc;
                int                     parallel_workers = 0;
 
-               /*
-                * Decide on the number of workers to request for this append path.
-                * For now, we just use the maximum value from among the members.  It
-                * might be useful to use a higher number if the Append node were
-                * smart enough to spread out the workers, but it currently isn't.
-                */
+               /* Find the highest number of workers requested for any subpath. */
                foreach(lc, partial_subpaths)
                {
                        Path       *path = lfirst(lc);
@@ -1517,9 +1571,78 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
                }
                Assert(parallel_workers > 0);
 
+               /*
+                * If the use of parallel append is permitted, always request at least
+                * log2(# of children) paths.  We assume it can be useful to have
+                * extra workers in this case because they will be spread out across
+                * the children.  The precise formula is just a guess, but we don't
+                * want to end up with a radically different answer for a table with N
+                * partitions vs. an unpartitioned table with the same data, so the
+                * use of some kind of log-scaling here seems to make some sense.
+                */
+               if (enable_parallel_append)
+               {
+                       parallel_workers = Max(parallel_workers,
+                                                                  fls(list_length(live_childrels)));
+                       parallel_workers = Min(parallel_workers,
+                                                                  max_parallel_workers_per_gather);
+               }
+               Assert(parallel_workers > 0);
+
                /* Generate a partial append path. */
-               appendpath = create_append_path(rel, partial_subpaths, NULL,
-                                                                               parallel_workers, partitioned_rels);
+               appendpath = create_append_path(rel, NIL, partial_subpaths, NULL,
+                                                                               parallel_workers,
+                                                                               enable_parallel_append,
+                                                                               partitioned_rels, -1);
+
+               /*
+                * Make sure any subsequent partial paths use the same row count
+                * estimate.
+                */
+               partial_rows = appendpath->path.rows;
+
+               /* Add the path. */
+               add_partial_path(rel, (Path *) appendpath);
+       }
+
+       /*
+        * Consider a parallel-aware append using a mix of partial and non-partial
+        * paths.  (This only makes sense if there's at least one child which has
+        * a non-partial path that is substantially cheaper than any partial path;
+        * otherwise, we should use the append path added in the previous step.)
+        */
+       if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL)
+       {
+               AppendPath *appendpath;
+               ListCell   *lc;
+               int                     parallel_workers = 0;
+
+               /*
+                * Find the highest number of workers requested for any partial
+                * subpath.
+                */
+               foreach(lc, pa_partial_subpaths)
+               {
+                       Path       *path = lfirst(lc);
+
+                       parallel_workers = Max(parallel_workers, path->parallel_workers);
+               }
+
+               /*
+                * Same formula here as above.  It's even more important in this
+                * instance because the non-partial paths won't contribute anything to
+                * the planned number of parallel workers.
+                */
+               parallel_workers = Max(parallel_workers,
+                                                          fls(list_length(live_childrels)));
+               parallel_workers = Min(parallel_workers,
+                                                          max_parallel_workers_per_gather);
+               Assert(parallel_workers > 0);
+
+               appendpath = create_append_path(rel, pa_nonpartial_subpaths,
+                                                                               pa_partial_subpaths,
+                                                                               NULL, parallel_workers, true,
+                                                                               partitioned_rels, partial_rows);
                add_partial_path(rel, (Path *) appendpath);
        }
 
@@ -1567,13 +1690,14 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
                                subpaths_valid = false;
                                break;
                        }
-                       subpaths = accumulate_append_subpath(subpaths, subpath);
+                       accumulate_append_subpath(subpath, &subpaths, NULL);
                }
 
                if (subpaths_valid)
                        add_path(rel, (Path *)
-                                        create_append_path(rel, subpaths, required_outer, 0,
-                                                                               partitioned_rels));
+                                        create_append_path(rel, subpaths, NIL,
+                                                                               required_outer, 0, false,
+                                                                               partitioned_rels, -1));
        }
 }
 
@@ -1657,10 +1781,10 @@ generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel,
                        if (cheapest_startup != cheapest_total)
                                startup_neq_total = true;
 
-                       startup_subpaths =
-                               accumulate_append_subpath(startup_subpaths, cheapest_startup);
-                       total_subpaths =
-                               accumulate_append_subpath(total_subpaths, cheapest_total);
+                       accumulate_append_subpath(cheapest_startup,
+                                                                         &startup_subpaths, NULL);
+                       accumulate_append_subpath(cheapest_total,
+                                                                         &total_subpaths, NULL);
                }
 
                /* ... and build the MergeAppend paths */
@@ -1756,7 +1880,7 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel,
 
 /*
  * accumulate_append_subpath
- *             Add a subpath to the list being built for an Append or MergeAppend
+ *             Add a subpath to the list being built for an Append or MergeAppend.
  *
  * It's possible that the child is itself an Append or MergeAppend path, in
  * which case we can "cut out the middleman" and just add its child paths to
@@ -1767,26 +1891,53 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel,
  * omitting a sort step, which seems fine: if the parent is to be an Append,
  * its result would be unsorted anyway, while if the parent is to be a
  * MergeAppend, there's no point in a separate sort on a child.
+ * its result would be unsorted anyway.
+ *
+ * Normally, either path is a partial path and subpaths is a list of partial
+ * paths, or else path is a non-partial plan and subpaths is a list of those.
+ * However, if path is a parallel-aware Append, then we add its partial path
+ * children to subpaths and the rest to special_subpaths.  If the latter is
+ * NULL, we don't flatten the path at all (unless it contains only partial
+ * paths).
  */
-static List *
-accumulate_append_subpath(List *subpaths, Path *path)
+static void
+accumulate_append_subpath(Path *path, List **subpaths, List **special_subpaths)
 {
        if (IsA(path, AppendPath))
        {
                AppendPath *apath = (AppendPath *) path;
 
-               /* list_copy is important here to avoid sharing list substructure */
-               return list_concat(subpaths, list_copy(apath->subpaths));
+               if (!apath->path.parallel_aware || apath->first_partial_path == 0)
+               {
+                       /* list_copy is important here to avoid sharing list substructure */
+                       *subpaths = list_concat(*subpaths, list_copy(apath->subpaths));
+                       return;
+               }
+               else if (special_subpaths != NULL)
+               {
+                       List       *new_special_subpaths;
+
+                       /* Split Parallel Append into partial and non-partial subpaths */
+                       *subpaths = list_concat(*subpaths,
+                                                                       list_copy_tail(apath->subpaths,
+                                                                                                  apath->first_partial_path));
+                       new_special_subpaths =
+                               list_truncate(list_copy(apath->subpaths),
+                                                         apath->first_partial_path);
+                       *special_subpaths = list_concat(*special_subpaths,
+                                                                                       new_special_subpaths);
+               }
        }
        else if (IsA(path, MergeAppendPath))
        {
                MergeAppendPath *mpath = (MergeAppendPath *) path;
 
                /* list_copy is important here to avoid sharing list substructure */
-               return list_concat(subpaths, list_copy(mpath->subpaths));
+               *subpaths = list_concat(*subpaths, list_copy(mpath->subpaths));
+               return;
        }
-       else
-               return lappend(subpaths, path);
+
+       *subpaths = lappend(*subpaths, path);
 }
 
 /*
@@ -1809,7 +1960,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel)
        rel->pathlist = NIL;
        rel->partial_pathlist = NIL;
 
-       add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL));
+       add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
+                                                                                         0, false, NIL, -1));
 
        /*
         * We set the cheapest path immediately, to ensure that IS_DUMMY_REL()
index d11bf19e30aa685784df65cead38f3c42f98639b..877827dcb52bbd99958850b811aede22118d1129 100644 (file)
@@ -128,6 +128,7 @@ bool                enable_mergejoin = true;
 bool           enable_hashjoin = true;
 bool           enable_gathermerge = true;
 bool           enable_partition_wise_join = false;
+bool           enable_parallel_append = true;
 
 typedef struct
 {
@@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root,
                                                                 Relids inner_relids,
                                                                 SpecialJoinInfo *sjinfo,
                                                                 List **restrictlist);
+static Cost append_nonpartial_cost(List *subpaths, int numpaths,
+                                          int parallel_workers);
 static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
 static double relation_byte_size(double tuples, int width);
 static double page_size(double tuples, int width);
@@ -1741,6 +1744,167 @@ cost_sort(Path *path, PlannerInfo *root,
        path->total_cost = startup_cost + run_cost;
 }
 
+/*
+ * append_nonpartial_cost
+ *       Estimate the cost of the non-partial paths in a Parallel Append.
+ *       The non-partial paths are assumed to be the first "numpaths" paths
+ *       from the subpaths list, and to be in order of decreasing cost.
+ */
+static Cost
+append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers)
+{
+       Cost       *costarr;
+       int                     arrlen;
+       ListCell   *l;
+       ListCell   *cell;
+       int                     i;
+       int                     path_index;
+       int                     min_index;
+       int                     max_index;
+
+       if (numpaths == 0)
+               return 0;
+
+       /*
+        * Array length is number of workers or number of relevants paths,
+        * whichever is less.
+        */
+       arrlen = Min(parallel_workers, numpaths);
+       costarr = (Cost *) palloc(sizeof(Cost) * arrlen);
+
+       /* The first few paths will each be claimed by a different worker. */
+       path_index = 0;
+       foreach(cell, subpaths)
+       {
+               Path       *subpath = (Path *) lfirst(cell);
+
+               if (path_index == arrlen)
+                       break;
+               costarr[path_index++] = subpath->total_cost;
+       }
+
+       /*
+        * Since subpaths are sorted by decreasing cost, the last one will have
+        * the minimum cost.
+        */
+       min_index = arrlen - 1;
+
+       /*
+        * For each of the remaining subpaths, add its cost to the array element
+        * with minimum cost.
+        */
+       for_each_cell(l, cell)
+       {
+               Path       *subpath = (Path *) lfirst(l);
+               int                     i;
+
+               /* Consider only the non-partial paths */
+               if (path_index++ == numpaths)
+                       break;
+
+               costarr[min_index] += subpath->total_cost;
+
+               /* Update the new min cost array index */
+               for (min_index = i = 0; i < arrlen; i++)
+               {
+                       if (costarr[i] < costarr[min_index])
+                               min_index = i;
+               }
+       }
+
+       /* Return the highest cost from the array */
+       for (max_index = i = 0; i < arrlen; i++)
+       {
+               if (costarr[i] > costarr[max_index])
+                       max_index = i;
+       }
+
+       return costarr[max_index];
+}
+
+/*
+ * cost_append
+ *       Determines and returns the cost of an Append node.
+ *
+ * We charge nothing extra for the Append itself, which perhaps is too
+ * optimistic, but since it doesn't do any selection or projection, it is a
+ * pretty cheap node.
+ */
+void
+cost_append(AppendPath *apath)
+{
+       ListCell   *l;
+
+       apath->path.startup_cost = 0;
+       apath->path.total_cost = 0;
+
+       if (apath->subpaths == NIL)
+               return;
+
+       if (!apath->path.parallel_aware)
+       {
+               Path       *subpath = (Path *) linitial(apath->subpaths);
+
+               /*
+                * Startup cost of non-parallel-aware Append is the startup cost of
+                * first subpath.
+                */
+               apath->path.startup_cost = subpath->startup_cost;
+
+               /* Compute rows and costs as sums of subplan rows and costs. */
+               foreach(l, apath->subpaths)
+               {
+                       Path       *subpath = (Path *) lfirst(l);
+
+                       apath->path.rows += subpath->rows;
+                       apath->path.total_cost += subpath->total_cost;
+               }
+       }
+       else                                            /* parallel-aware */
+       {
+               int                     i = 0;
+               double          parallel_divisor = get_parallel_divisor(&apath->path);
+
+               /* Calculate startup cost. */
+               foreach(l, apath->subpaths)
+               {
+                       Path       *subpath = (Path *) lfirst(l);
+
+                       /*
+                        * Append will start returning tuples when the child node having
+                        * lowest startup cost is done setting up. We consider only the
+                        * first few subplans that immediately get a worker assigned.
+                        */
+                       if (i == 0)
+                               apath->path.startup_cost = subpath->startup_cost;
+                       else if (i < apath->path.parallel_workers)
+                               apath->path.startup_cost = Min(apath->path.startup_cost,
+                                                                                          subpath->startup_cost);
+
+                       /*
+                        * Apply parallel divisor to non-partial subpaths.  Also add the
+                        * cost of partial paths to the total cost, but ignore non-partial
+                        * paths for now.
+                        */
+                       if (i < apath->first_partial_path)
+                               apath->path.rows += subpath->rows / parallel_divisor;
+                       else
+                       {
+                               apath->path.rows += subpath->rows;
+                               apath->path.total_cost += subpath->total_cost;
+                       }
+
+                       i++;
+               }
+
+               /* Add cost for non-partial subpaths. */
+               apath->path.total_cost +=
+                       append_nonpartial_cost(apath->subpaths,
+                                                                  apath->first_partial_path,
+                                                                  apath->path.parallel_workers);
+       }
+}
+
 /*
  * cost_merge_append
  *       Determines and returns the cost of a MergeAppend node.
index 453f25964ac64b92d90d087e3f07a3f9f5398496..5e03f8bc213999bf0b78086f1397e663ef1e522b 100644 (file)
@@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel)
        rel->partial_pathlist = NIL;
 
        /* Set up the dummy path */
-       add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL));
+       add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
+                                                                                         0, false, NIL, -1));
 
        /* Set or update cheapest_total_path and related fields */
        set_cheapest(rel);
index d4454779ee94d71899cfd1579713bc9383c1743e..f6c83d0477ca5b262f5e251ad808d48052354e37 100644 (file)
@@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
                                                 Index scanrelid, char *enrname);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
                                   Index scanrelid, int wtParam);
-static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels);
+static Append *make_append(List *appendplans, int first_partial_plan,
+                       List *tlist, List *partitioned_rels);
 static RecursiveUnion *make_recursive_union(List *tlist,
                                         Plan *lefttree,
                                         Plan *righttree,
@@ -1059,7 +1060,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
         * parent-rel Vars it'll be asked to emit.
         */
 
-       plan = make_append(subplans, tlist, best_path->partitioned_rels);
+       plan = make_append(subplans, best_path->first_partial_path,
+                                          tlist, best_path->partitioned_rels);
 
        copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -5294,7 +5296,8 @@ make_foreignscan(List *qptlist,
 }
 
 static Append *
-make_append(List *appendplans, List *tlist, List *partitioned_rels)
+make_append(List *appendplans, int first_partial_plan,
+                       List *tlist, List *partitioned_rels)
 {
        Append     *node = makeNode(Append);
        Plan       *plan = &node->plan;
@@ -5305,6 +5308,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels)
        plan->righttree = NULL;
        node->partitioned_rels = partitioned_rels;
        node->appendplans = appendplans;
+       node->first_partial_plan = first_partial_plan;
 
        return node;
 }
index ef2eaeac0a48be97d193d397fa5579dc14139934..e8bc15c35d20b0a7194e2c03c1317323281cbdcd 100644 (file)
@@ -3680,9 +3680,12 @@ create_grouping_paths(PlannerInfo *root,
                        path = (Path *)
                                create_append_path(grouped_rel,
                                                                   paths,
+                                                                  NIL,
                                                                   NULL,
                                                                   0,
-                                                                  NIL);
+                                                                  false,
+                                                                  NIL,
+                                                                  -1);
                        path->pathtarget = target;
                }
                else
index f620243ab440e8c3af0a7a2d16dde420010aaffb..a24e8acfa6c34a22b15e383613e70eb8da89f1e6 100644 (file)
@@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root,
        /*
         * Append the child results together.
         */
-       path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL);
-
+       path = (Path *) create_append_path(result_rel, pathlist, NIL,
+                                                                          NULL, 0, false, NIL, -1);
        /* We have to manually jam the right tlist into the path; ick */
        path->pathtarget = create_pathtarget(root, tlist);
 
@@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root,
        /*
         * Append the child results together.
         */
-       path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL);
+       path = (Path *) create_append_path(result_rel, pathlist, NIL,
+                                                                          NULL, 0, false, NIL, -1);
 
        /* We have to manually jam the right tlist into the path; ick */
        path->pathtarget = create_pathtarget(root, tlist);
index bc0841bf0b804134d079fd3bcbfae3cca944ea1b..54126fbb6a5447257a24d3b6c7bd98a6889d0490 100644 (file)
@@ -51,6 +51,8 @@ typedef enum
 #define STD_FUZZ_FACTOR 1.01
 
 static List *translate_sub_tlist(List *tlist, int relid);
+static int     append_total_cost_compare(const void *a, const void *b);
+static int     append_startup_cost_compare(const void *a, const void *b);
 static List *reparameterize_pathlist_by_child(PlannerInfo *root,
                                                                 List *pathlist,
                                                                 RelOptInfo *child_rel);
@@ -1208,44 +1210,50 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
  * Note that we must handle subpaths = NIL, representing a dummy access path.
  */
 AppendPath *
-create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
-                                  int parallel_workers, List *partitioned_rels)
+create_append_path(RelOptInfo *rel,
+                                  List *subpaths, List *partial_subpaths,
+                                  Relids required_outer,
+                                  int parallel_workers, bool parallel_aware,
+                                  List *partitioned_rels, double rows)
 {
        AppendPath *pathnode = makeNode(AppendPath);
        ListCell   *l;
 
+       Assert(!parallel_aware || parallel_workers > 0);
+
        pathnode->path.pathtype = T_Append;
        pathnode->path.parent = rel;
        pathnode->path.pathtarget = rel->reltarget;
        pathnode->path.param_info = get_appendrel_parampathinfo(rel,
                                                                                                                        required_outer);
-       pathnode->path.parallel_aware = false;
+       pathnode->path.parallel_aware = parallel_aware;
        pathnode->path.parallel_safe = rel->consider_parallel;
        pathnode->path.parallel_workers = parallel_workers;
        pathnode->path.pathkeys = NIL;  /* result is always considered unsorted */
        pathnode->partitioned_rels = list_copy(partitioned_rels);
-       pathnode->subpaths = subpaths;
 
        /*
-        * We don't bother with inventing a cost_append(), but just do it here.
-        *
-        * Compute rows and costs as sums of subplan rows and costs.  We charge
-        * nothing extra for the Append itself, which perhaps is too optimistic,
-        * but since it doesn't do any selection or projection, it is a pretty
-        * cheap node.
+        * For parallel append, non-partial paths are sorted by descending total
+        * costs. That way, the total time to finish all non-partial paths is
+        * minimized.  Also, the partial paths are sorted by descending startup
+        * costs.  There may be some paths that require to do startup work by a
+        * single worker.  In such case, it's better for workers to choose the
+        * expensive ones first, whereas the leader should choose the cheapest
+        * startup plan.
         */
-       pathnode->path.rows = 0;
-       pathnode->path.startup_cost = 0;
-       pathnode->path.total_cost = 0;
+       if (pathnode->path.parallel_aware)
+       {
+               subpaths = list_qsort(subpaths, append_total_cost_compare);
+               partial_subpaths = list_qsort(partial_subpaths,
+                                                                         append_startup_cost_compare);
+       }
+       pathnode->first_partial_path = list_length(subpaths);
+       pathnode->subpaths = list_concat(subpaths, partial_subpaths);
+
        foreach(l, subpaths)
        {
                Path       *subpath = (Path *) lfirst(l);
 
-               pathnode->path.rows += subpath->rows;
-
-               if (l == list_head(subpaths))   /* first node? */
-                       pathnode->path.startup_cost = subpath->startup_cost;
-               pathnode->path.total_cost += subpath->total_cost;
                pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
                        subpath->parallel_safe;
 
@@ -1253,9 +1261,53 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
                Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));
        }
 
+       Assert(!parallel_aware || pathnode->path.parallel_safe);
+
+       cost_append(pathnode);
+
+       /* If the caller provided a row estimate, override the computed value. */
+       if (rows >= 0)
+               pathnode->path.rows = rows;
+
        return pathnode;
 }
 
+/*
+ * append_total_cost_compare
+ *       list_qsort comparator for sorting append child paths by total_cost
+ */
+static int
+append_total_cost_compare(const void *a, const void *b)
+{
+       Path       *path1 = (Path *) lfirst(*(ListCell **) a);
+       Path       *path2 = (Path *) lfirst(*(ListCell **) b);
+
+       if (path1->total_cost > path2->total_cost)
+               return -1;
+       if (path1->total_cost < path2->total_cost)
+               return 1;
+
+       return 0;
+}
+
+/*
+ * append_startup_cost_compare
+ *       list_qsort comparator for sorting append child paths by startup_cost
+ */
+static int
+append_startup_cost_compare(const void *a, const void *b)
+{
+       Path       *path1 = (Path *) lfirst(*(ListCell **) a);
+       Path       *path2 = (Path *) lfirst(*(ListCell **) b);
+
+       if (path1->startup_cost > path2->startup_cost)
+               return -1;
+       if (path1->startup_cost < path2->startup_cost)
+               return 1;
+
+       return 0;
+}
+
 /*
  * create_merge_append_path
  *       Creates a path corresponding to a MergeAppend plan, returning the
index e5c3e867099f8bc96af658bcc581bcae94bd00c2..46f5c4277d4acdffc8a3d2468cb6a45a0f33e35b 100644 (file)
@@ -517,6 +517,7 @@ RegisterLWLockTranches(void)
        LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
                                                  "session_typmod_table");
        LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+       LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
 
        /* Register named tranches. */
        for (i = 0; i < NamedLWLockTrancheRequests; i++)
index 6dcd738be649f44c9ff2260e0e35705d0a0cd8a9..0f7a96d85c3904169861798c9cb0e04206ef3a7c 100644 (file)
@@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] =
                false,
                NULL, NULL, NULL
        },
+       {
+               {"enable_parallel_append", PGC_USERSET, QUERY_TUNING_METHOD,
+                       gettext_noop("Enables the planner's use of parallel append plans."),
+                       NULL
+               },
+               &enable_parallel_append,
+               true,
+               NULL, NULL, NULL
+       },
 
        {
                {"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
index 16ffbbeea8146e445189903708318cc27b45eb25..842cf3601aa6f7cf3e8ceab150ee570bdb9f2c4c 100644 (file)
 #enable_material = on
 #enable_mergejoin = on
 #enable_nestloop = on
+#enable_parallel_append = on
 #enable_seqscan = on
 #enable_sort = on
 #enable_tidscan = on
index 4e38a1380e2507f7af0cba5a8ebdafbd3bc3c0e9..d42d50614c8f811ffcf98e7fbc5e23d7e95e3f77 100644 (file)
 #ifndef NODEAPPEND_H
 #define NODEAPPEND_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
 extern void ExecEndAppend(AppendState *node);
 extern void ExecReScanAppend(AppendState *node);
+extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
 
 #endif                                                 /* NODEAPPEND_H */
index 084d59ef834af0a218894c448ea6463f42a277f4..1a35c5c9ada13789f1d622f8efbfbdf62095bc4c 100644 (file)
@@ -21,6 +21,7 @@
 #include "lib/pairingheap.h"
 #include "nodes/params.h"
 #include "nodes/plannodes.h"
+#include "storage/spin.h"
 #include "utils/hsearch.h"
 #include "utils/queryenvironment.h"
 #include "utils/reltrigger.h"
@@ -1000,13 +1001,22 @@ typedef struct ModifyTableState
  *             whichplan               which plan is being executed (0 .. n-1)
  * ----------------
  */
-typedef struct AppendState
+
+struct AppendState;
+typedef struct AppendState AppendState;
+struct ParallelAppendState;
+typedef struct ParallelAppendState ParallelAppendState;
+
+struct AppendState
 {
        PlanState       ps;                             /* its first field is NodeTag */
        PlanState **appendplans;        /* array of PlanStates for my inputs */
        int                     as_nplans;
        int                     as_whichplan;
-} AppendState;
+       ParallelAppendState *as_pstate; /* parallel coordination info */
+       Size            pstate_len;             /* size of parallel coordination info */
+       bool            (*choose_next_subplan) (AppendState *);
+};
 
 /* ----------------
  *      MergeAppendState information
index 667d5e269cc516dd600df33d76461fd801364ace..711db925769f3a9ddb1d4301200f7a709d764729 100644 (file)
@@ -269,6 +269,9 @@ extern void list_free_deep(List *list);
 extern List *list_copy(const List *list);
 extern List *list_copy_tail(const List *list, int nskip);
 
+typedef int (*list_qsort_comparator) (const void *a, const void *b);
+extern List *list_qsort(const List *list, list_qsort_comparator cmp);
+
 /*
  * To ease migration to the new list API, a set of compatibility
  * macros are provided that reduce the impact of the list API changes
index 9b38d44ba0293b54e5e4e7fa34e1a7333ece66ed..02fb366680f8111d4314942a440866b6f71642ff 100644 (file)
@@ -248,6 +248,7 @@ typedef struct Append
        /* RT indexes of non-leaf tables in a partition tree */
        List       *partitioned_rels;
        List       *appendplans;
+       int                     first_partial_plan;
 } Append;
 
 /* ----------------
index 51df8e9741565080d24725724e5ff2087aef7d42..1108b6a0ea03c3259e004484b29d9c43a3436640 100644 (file)
@@ -1255,6 +1255,9 @@ typedef struct CustomPath
  * AppendPath represents an Append plan, ie, successive execution of
  * several member plans.
  *
+ * For partial Append, 'subpaths' contains non-partial subpaths followed by
+ * partial subpaths.
+ *
  * Note: it is possible for "subpaths" to contain only one, or even no,
  * elements.  These cases are optimized during create_append_plan.
  * In particular, an AppendPath with no subpaths is a "dummy" path that
@@ -1266,6 +1269,9 @@ typedef struct AppendPath
        /* RT indexes of non-leaf tables in a partition tree */
        List       *partitioned_rels;
        List       *subpaths;           /* list of component Paths */
+
+       /* Index of first partial path in subpaths */
+       int                     first_partial_path;
 } AppendPath;
 
 #define IS_DUMMY_PATH(p) \
index 6c2317df3977f322848add722cb8eceacd99959a..5a1fbf97c38916e7bca754ba4f701df20c422c5e 100644 (file)
@@ -68,6 +68,7 @@ extern bool enable_mergejoin;
 extern bool enable_hashjoin;
 extern bool enable_gathermerge;
 extern bool enable_partition_wise_join;
+extern bool enable_parallel_append;
 extern int     constraint_exclusion;
 
 extern double clamp_row_est(double nrows);
@@ -106,6 +107,7 @@ extern void cost_sort(Path *path, PlannerInfo *root,
                  List *pathkeys, Cost input_cost, double tuples, int width,
                  Cost comparison_cost, int sort_mem,
                  double limit_tuples);
+extern void cost_append(AppendPath *path);
 extern void cost_merge_append(Path *path, PlannerInfo *root,
                                  List *pathkeys, int n_streams,
                                  Cost input_startup_cost, Cost input_total_cost,
index e9ed16ad32138192420499c3148599b0c001ac77..99f65b44f223ce6caa39cb26943fb9505c759b87 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef PATHNODE_H
 #define PATHNODE_H
 
+#include "nodes/bitmapset.h"
 #include "nodes/relation.h"
 
 
@@ -63,9 +64,11 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root,
                                          List *bitmapquals);
 extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
                                        List *tidquals, Relids required_outer);
-extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths,
-                                  Relids required_outer, int parallel_workers,
-                                  List *partitioned_rels);
+extern AppendPath *create_append_path(RelOptInfo *rel,
+                                  List *subpaths, List *partial_subpaths,
+                                  Relids required_outer,
+                                  int parallel_workers, bool parallel_aware,
+                                  List *partitioned_rels, double rows);
 extern MergeAppendPath *create_merge_append_path(PlannerInfo *root,
                                                 RelOptInfo *rel,
                                                 List *subpaths,
index 596fdadc6323abcfbe2d543a16bb03889f6f315c..460843d73e2d25c7a2efb1b31fd0afbc786cfc81 100644 (file)
@@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds
        LWTRANCHE_SESSION_RECORD_TABLE,
        LWTRANCHE_SESSION_TYPMOD_TABLE,
        LWTRANCHE_TBM,
+       LWTRANCHE_PARALLEL_APPEND,
        LWTRANCHE_FIRST_USER_DEFINED
 }                      BuiltinTrancheIds;
 
index fac7b62f9c99f6dffcd9064af7a82c9dadd993b0..a79f891da72c30a27b2e1aa2b47d4f9245a85937 100644 (file)
@@ -1404,6 +1404,7 @@ select min(1-id) from matest0;
 
 reset enable_indexscan;
 set enable_seqscan = off;  -- plan with fewest seqscans should be merge
+set enable_parallel_append = off; -- Don't let parallel-append interfere
 explain (verbose, costs off) select * from matest0 order by 1-id;
                             QUERY PLAN                            
 ------------------------------------------------------------------
@@ -1470,6 +1471,7 @@ select min(1-id) from matest0;
 (1 row)
 
 reset enable_seqscan;
+reset enable_parallel_append;
 drop table matest0 cascade;
 NOTICE:  drop cascades to 3 other objects
 DETAIL:  drop cascades to table matest1
index b748c98c9156a749c5b78b5db874f0f9ad01a484..62ed719cccba1216586e7d39242e4e5d3444d031 100644 (file)
@@ -11,8 +11,88 @@ set parallel_setup_cost=0;
 set parallel_tuple_cost=0;
 set min_parallel_table_scan_size=0;
 set max_parallel_workers_per_gather=4;
+-- Parallel Append with partial-subplans
 explain (costs off)
-  select count(*) from a_star;
+  select round(avg(aa)), sum(aa) from a_star;
+                     QUERY PLAN                      
+-----------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 3
+         ->  Partial Aggregate
+               ->  Parallel Append
+                     ->  Parallel Seq Scan on a_star
+                     ->  Parallel Seq Scan on b_star
+                     ->  Parallel Seq Scan on c_star
+                     ->  Parallel Seq Scan on d_star
+                     ->  Parallel Seq Scan on e_star
+                     ->  Parallel Seq Scan on f_star
+(11 rows)
+
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum 
+-------+-----
+    14 | 355
+(1 row)
+
+-- Parallel Append with both partial and non-partial subplans
+alter table c_star set (parallel_workers = 0);
+alter table d_star set (parallel_workers = 0);
+explain (costs off)
+  select round(avg(aa)), sum(aa) from a_star;
+                     QUERY PLAN                      
+-----------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 3
+         ->  Partial Aggregate
+               ->  Parallel Append
+                     ->  Seq Scan on d_star
+                     ->  Seq Scan on c_star
+                     ->  Parallel Seq Scan on a_star
+                     ->  Parallel Seq Scan on b_star
+                     ->  Parallel Seq Scan on e_star
+                     ->  Parallel Seq Scan on f_star
+(11 rows)
+
+-- Parallel Append with only non-partial subplans
+alter table a_star set (parallel_workers = 0);
+alter table b_star set (parallel_workers = 0);
+alter table e_star set (parallel_workers = 0);
+alter table f_star set (parallel_workers = 0);
+explain (costs off)
+  select round(avg(aa)), sum(aa) from a_star;
+                 QUERY PLAN                 
+--------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 3
+         ->  Partial Aggregate
+               ->  Parallel Append
+                     ->  Seq Scan on d_star
+                     ->  Seq Scan on f_star
+                     ->  Seq Scan on e_star
+                     ->  Seq Scan on b_star
+                     ->  Seq Scan on c_star
+                     ->  Seq Scan on a_star
+(11 rows)
+
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum 
+-------+-----
+    14 | 355
+(1 row)
+
+-- Disable Parallel Append
+alter table a_star reset (parallel_workers);
+alter table b_star reset (parallel_workers);
+alter table c_star reset (parallel_workers);
+alter table d_star reset (parallel_workers);
+alter table e_star reset (parallel_workers);
+alter table f_star reset (parallel_workers);
+set enable_parallel_append to off;
+explain (costs off)
+  select round(avg(aa)), sum(aa) from a_star;
                      QUERY PLAN                      
 -----------------------------------------------------
  Finalize Aggregate
@@ -28,12 +108,13 @@ explain (costs off)
                      ->  Parallel Seq Scan on f_star
 (11 rows)
 
-select count(*) from a_star;
- count 
--------
-    50
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum 
+-------+-----
+    14 | 355
 (1 row)
 
+reset enable_parallel_append;
 -- test with leader participation disabled
 set parallel_leader_participation = off;
 explain (costs off)
index cd1f7f301d44bee4799fddeefd70acd0e9505e97..2b738aae7c50bd5d77b2c955ccd7b101684bc6eb 100644 (file)
@@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%';
  enable_material            | on
  enable_mergejoin           | on
  enable_nestloop            | on
+ enable_parallel_append     | on
  enable_partition_wise_join | off
  enable_seqscan             | on
  enable_sort                | on
  enable_tidscan             | on
-(13 rows)
+(14 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
index c71febffc2ed725a97b565e7b8ab19693f40755f..2e42ae115d39b39eaedc796d41eda21555e7f90c 100644 (file)
@@ -508,11 +508,13 @@ select min(1-id) from matest0;
 reset enable_indexscan;
 
 set enable_seqscan = off;  -- plan with fewest seqscans should be merge
+set enable_parallel_append = off; -- Don't let parallel-append interfere
 explain (verbose, costs off) select * from matest0 order by 1-id;
 select * from matest0 order by 1-id;
 explain (verbose, costs off) select min(1-id) from matest0;
 select min(1-id) from matest0;
 reset enable_seqscan;
+reset enable_parallel_append;
 
 drop table matest0 cascade;
 
index 00df92779c6b52907e0784e449c74074b191af86..d3f2028468d6f0cdaa82a0653d04979e38aec6b2 100644 (file)
@@ -15,9 +15,38 @@ set parallel_tuple_cost=0;
 set min_parallel_table_scan_size=0;
 set max_parallel_workers_per_gather=4;
 
+-- Parallel Append with partial-subplans
 explain (costs off)
-  select count(*) from a_star;
-select count(*) from a_star;
+  select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+
+-- Parallel Append with both partial and non-partial subplans
+alter table c_star set (parallel_workers = 0);
+alter table d_star set (parallel_workers = 0);
+explain (costs off)
+  select round(avg(aa)), sum(aa) from a_star;
+
+-- Parallel Append with only non-partial subplans
+alter table a_star set (parallel_workers = 0);
+alter table b_star set (parallel_workers = 0);
+alter table e_star set (parallel_workers = 0);
+alter table f_star set (parallel_workers = 0);
+explain (costs off)
+  select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+
+-- Disable Parallel Append
+alter table a_star reset (parallel_workers);
+alter table b_star reset (parallel_workers);
+alter table c_star reset (parallel_workers);
+alter table d_star reset (parallel_workers);
+alter table e_star reset (parallel_workers);
+alter table f_star reset (parallel_workers);
+set enable_parallel_append to off;
+explain (costs off)
+  select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+reset enable_parallel_append;
 
 -- test with leader participation disabled
 set parallel_leader_participation = off;