From: Robert Haas Date: Tue, 5 Dec 2017 22:28:39 +0000 (-0500) Subject: Support Parallel Append plan nodes. X-Git-Tag: REL_11_BETA1~1107 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=ab72716778128fb63d54ac256adf7fe6820a1185;p=postgresql Support Parallel Append plan nodes. When we create an Append node, we can spread out the workers over the subplans instead of piling on to each subplan one at a time, which should typically be a bit more efficient, both because the startup cost of any plan executed entirely by one worker is paid only once and also because of reduced contention. We can also construct Append plans using a mix of partial and non-partial subplans, which may allow for parallelism in places that otherwise couldn't support it. Unfortunately, this patch doesn't handle the important case of parallelizing UNION ALL by running each branch in a separate worker; the executor infrastructure is added here, but more planner work is needed. Amit Khandekar, Robert Haas, Amul Sul, reviewed and tested by Ashutosh Bapat, Amit Langote, Rafia Sabih, Amit Kapila, and Rajkumar Raghuwanshi. Discussion: http://postgr.es/m/CAJ3gD9dy0K_E8r727heqXoBmWZ83HwLFwdcaSSmBQ1+S+vRuUQ@mail.gmail.com --- diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 563ad1fc7f..9ae6861cd7 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3633,6 +3633,20 @@ ANY num_sync ( + enable_parallel_append (boolean) + + enable_parallel_append configuration parameter + + + + + Enables or disables the query planner's use of parallel-aware + append plan types. The default is on. + + + + enable_partition_wise_join (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 8d461c8145..b6f80d9708 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -845,7 +845,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser - LWLock + LWLock ShmemIndexLock Waiting to find or allocate space in shared memory. @@ -1116,6 +1116,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser tbm Waiting for TBM shared iterator lock. + + parallel_append + Waiting to choose the next subplan during Parallel Append plan + execution. + Lock relation diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ff5cff59b0..558cb08b07 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -26,6 +26,7 @@ #include "executor/execExpr.h" #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" @@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendEstimate((AppendState *) planstate, + e->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanEstimate((CustomScanState *) planstate, @@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeDSM((AppendState *) planstate, + d->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeDSM((CustomScanState *) planstate, @@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendReInitializeDSM((AppendState *) planstate, pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanReInitializeDSM((CustomScanState *) planstate, @@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, pwcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeWorker((AppendState *) planstate, pwcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 1d2fb35d55..246a0b2d85 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -61,51 +61,27 @@ #include "executor/nodeAppend.h" #include "miscadmin.h" -static TupleTableSlot *ExecAppend(PlanState *pstate); -static bool exec_append_initialize_next(AppendState *appendstate); - - -/* ---------------------------------------------------------------- - * exec_append_initialize_next - * - * Sets up the append state node for the "next" scan. - * - * Returns t iff there is a "next" scan to process. - * ---------------------------------------------------------------- - */ -static bool -exec_append_initialize_next(AppendState *appendstate) +/* Shared state for parallel-aware Append. */ +struct ParallelAppendState { - int whichplan; + LWLock pa_lock; /* mutual exclusion to choose next subplan */ + int pa_next_plan; /* next plan to choose by any worker */ /* - * get information from the append node + * pa_finished[i] should be true if no more workers should select subplan + * i. for a non-partial plan, this should be set to true as soon as a + * worker selects the plan; for a partial plan, it remains false until + * some worker executes the plan to completion. */ - whichplan = appendstate->as_whichplan; + bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; +}; - if (whichplan < 0) - { - /* - * if scanning in reverse, we start at the last scan in the list and - * then proceed back to the first.. in any case we inform ExecAppend - * that we are at the end of the line by returning false - */ - appendstate->as_whichplan = 0; - return false; - } - else if (whichplan >= appendstate->as_nplans) - { - /* - * as above, end the scan if we go beyond the last scan in our list.. - */ - appendstate->as_whichplan = appendstate->as_nplans - 1; - return false; - } - else - { - return true; - } -} +#define INVALID_SUBPLAN_INDEX -1 + +static TupleTableSlot *ExecAppend(PlanState *pstate); +static bool choose_next_subplan_locally(AppendState *node); +static bool choose_next_subplan_for_leader(AppendState *node); +static bool choose_next_subplan_for_worker(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -185,10 +161,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->ps.ps_ProjInfo = NULL; /* - * initialize to scan first subplan + * Parallel-aware append plans must choose the first subplan to execute by + * looking at shared memory, but non-parallel-aware append plans can + * always start with the first subplan. */ - appendstate->as_whichplan = 0; - exec_append_initialize_next(appendstate); + appendstate->as_whichplan = + appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0; + + /* If parallel-aware, this will be overridden later. */ + appendstate->choose_next_subplan = choose_next_subplan_locally; return appendstate; } @@ -204,6 +185,11 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + /* If no subplan has been chosen, we must choose one before proceeding. */ + if (node->as_whichplan == INVALID_SUBPLAN_INDEX && + !node->choose_next_subplan(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + for (;;) { PlanState *subnode; @@ -214,6 +200,7 @@ ExecAppend(PlanState *pstate) /* * figure out which subplan we are currently processing */ + Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; /* @@ -231,19 +218,9 @@ ExecAppend(PlanState *pstate) return result; } - /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. - */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; - else - node->as_whichplan--; - if (!exec_append_initialize_next(node)) + /* choose new subplan; if none, we're done */ + if (!node->choose_next_subplan(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); - - /* Else loop back and try to get a tuple from the new subplan */ } } @@ -298,6 +275,246 @@ ExecReScanAppend(AppendState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } - node->as_whichplan = 0; - exec_append_initialize_next(node); + + node->as_whichplan = + node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0; +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * Compute the amount of space we'll need in the parallel + * query DSM, and inform pcxt->estimator about our needs. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pstate_len = + add_size(offsetof(ParallelAppendState, pa_finished), + sizeof(bool) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up shared state for Parallel Append. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendState *pstate; + + pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); + memset(pstate, 0, node->pstate_len); + LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); + + node->as_pstate = pstate; + node->choose_next_subplan = choose_next_subplan_for_leader; +} + +/* ---------------------------------------------------------------- + * ExecAppendReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) +{ + ParallelAppendState *pstate = node->as_pstate; + + pstate->pa_next_plan = 0; + memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) +{ + node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); + node->choose_next_subplan = choose_next_subplan_for_worker; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_locally + * + * Choose next subplan for a non-parallel-aware Append, + * returning false if there are no more. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_locally(AppendState *node) +{ + int whichplan = node->as_whichplan; + + /* We should never see INVALID_SUBPLAN_INDEX in this case. */ + Assert(whichplan >= 0 && whichplan <= node->as_nplans); + + if (ScanDirectionIsForward(node->ps.state->es_direction)) + { + if (whichplan >= node->as_nplans - 1) + return false; + node->as_whichplan++; + } + else + { + if (whichplan <= 0) + return false; + node->as_whichplan--; + } + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_leader + * + * Try to pick a plan which doesn't commit us to doing much + * work locally, so that as much work as possible is done in + * the workers. Cheapest subplans are at the end. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_leader(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + Append *append = (Append *) node->ps.plan; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + { + /* Mark just-completed subplan as finished. */ + node->as_pstate->pa_finished[node->as_whichplan] = true; + } + else + { + /* Start with last subplan. */ + node->as_whichplan = node->as_nplans - 1; + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[node->as_whichplan]) + { + if (node->as_whichplan == 0) + { + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + node->as_whichplan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + node->as_whichplan--; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < append->first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_worker + * + * Choose next subplan for a parallel-aware Append, returning + * false if there are no more. + * + * We start from the first plan and advance through the list; + * when we get back to the end, we loop back to the first + * nonpartial plan. This assigns the non-partial plans first + * in order of descending cost and then spreads out the + * workers as evenly as possible across the remaining partial + * plans. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_worker(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + Append *append = (Append *) node->ps.plan; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + /* Mark just-completed subplan as finished. */ + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + /* If all the plans are already done, we have nothing to do */ + if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX) + { + LWLockRelease(&pstate->pa_lock); + return false; + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[pstate->pa_next_plan]) + { + if (pstate->pa_next_plan < node->as_nplans - 1) + { + /* Advance to next plan. */ + pstate->pa_next_plan++; + } + else if (append->first_partial_plan < node->as_nplans) + { + /* Loop back to first partial plan. */ + pstate->pa_next_plan = append->first_partial_plan; + } + else + { + /* At last plan, no partial plans, arrange to bail out. */ + pstate->pa_next_plan = node->as_whichplan; + } + + if (pstate->pa_next_plan == node->as_whichplan) + { + /* We've tried everything! */ + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + } + + /* Pick the plan we found, and advance pa_next_plan one more time. */ + node->as_whichplan = pstate->pa_next_plan++; + if (pstate->pa_next_plan >= node->as_nplans) + { + Assert(append->first_partial_plan < node->as_nplans); + pstate->pa_next_plan = append->first_partial_plan; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < append->first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index aff9a62106..b1515dd8e1 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -242,6 +242,7 @@ _copyAppend(const Append *from) */ COPY_NODE_FIELD(partitioned_rels); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(first_partial_plan); return newnode; } diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index acaf4b5315..bee6244adc 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -1249,6 +1249,44 @@ list_copy_tail(const List *oldlist, int nskip) return newlist; } +/* + * Sort a list using qsort. A sorted list is built but the cells of the + * original list are re-used. The comparator function receives arguments of + * type ListCell ** + */ +List * +list_qsort(const List *list, list_qsort_comparator cmp) +{ + ListCell *cell; + int i; + int len = list_length(list); + ListCell **list_arr; + List *new_list; + + if (len == 0) + return NIL; + + i = 0; + list_arr = palloc(sizeof(ListCell *) * len); + foreach(cell, list) + list_arr[i++] = cell; + + qsort(list_arr, len, sizeof(ListCell *), cmp); + + new_list = (List *) palloc(sizeof(List)); + new_list->type = list->type; + new_list->length = len; + new_list->head = list_arr[0]; + new_list->tail = list_arr[len - 1]; + + for (i = 0; i < len - 1; i++) + list_arr[i]->next = list_arr[i + 1]; + + list_arr[len - 1]->next = NULL; + pfree(list_arr); + return new_list; +} + /* * Temporary compatibility functions * diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index c97ee24ade..b59a5219a7 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(partitioned_rels); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(first_partial_plan); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 7eb67fc040..0d17ae89b0 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1600,6 +1600,7 @@ _readAppend(void) READ_NODE_FIELD(partitioned_rels); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(first_partial_plan); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 44f6b03442..47986ba80a 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -101,7 +101,8 @@ static void generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel, static Path *get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); -static List *accumulate_append_subpath(List *subpaths, Path *path); +static void accumulate_append_subpath(Path *path, + List **subpaths, List **special_subpaths); static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel, @@ -1331,13 +1332,17 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, List *subpaths = NIL; bool subpaths_valid = true; List *partial_subpaths = NIL; + List *pa_partial_subpaths = NIL; + List *pa_nonpartial_subpaths = NIL; bool partial_subpaths_valid = true; + bool pa_subpaths_valid = enable_parallel_append; List *all_child_pathkeys = NIL; List *all_child_outers = NIL; ListCell *l; List *partitioned_rels = NIL; RangeTblEntry *rte; bool build_partitioned_rels = false; + double partial_rows = -1; if (IS_SIMPLE_REL(rel)) { @@ -1388,6 +1393,7 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, { RelOptInfo *childrel = lfirst(l); ListCell *lcp; + Path *cheapest_partial_path = NULL; /* * If we need to build partitioned_rels, accumulate the partitioned @@ -1408,18 +1414,69 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * If not, there's no workable unparameterized path. */ if (childrel->cheapest_total_path->param_info == NULL) - subpaths = accumulate_append_subpath(subpaths, - childrel->cheapest_total_path); + accumulate_append_subpath(childrel->cheapest_total_path, + &subpaths, NULL); else subpaths_valid = false; /* Same idea, but for a partial plan. */ if (childrel->partial_pathlist != NIL) - partial_subpaths = accumulate_append_subpath(partial_subpaths, - linitial(childrel->partial_pathlist)); + { + cheapest_partial_path = linitial(childrel->partial_pathlist); + accumulate_append_subpath(cheapest_partial_path, + &partial_subpaths, NULL); + } else partial_subpaths_valid = false; + /* + * Same idea, but for a parallel append mixing partial and non-partial + * paths. + */ + if (pa_subpaths_valid) + { + Path *nppath = NULL; + + nppath = + get_cheapest_parallel_safe_total_inner(childrel->pathlist); + + if (cheapest_partial_path == NULL && nppath == NULL) + { + /* Neither a partial nor a parallel-safe path? Forget it. */ + pa_subpaths_valid = false; + } + else if (nppath == NULL || + (cheapest_partial_path != NULL && + cheapest_partial_path->total_cost < nppath->total_cost)) + { + /* Partial path is cheaper or the only option. */ + Assert(cheapest_partial_path != NULL); + accumulate_append_subpath(cheapest_partial_path, + &pa_partial_subpaths, + &pa_nonpartial_subpaths); + + } + else + { + /* + * Either we've got only a non-partial path, or we think that + * a single backend can execute the best non-partial path + * faster than all the parallel backends working together can + * execute the best partial path. + * + * It might make sense to be more aggressive here. Even if + * the best non-partial path is more expensive than the best + * partial path, it could still be better to choose the + * non-partial path if there are several such paths that can + * be given to different workers. For now, we don't try to + * figure that out. + */ + accumulate_append_subpath(nppath, + &pa_nonpartial_subpaths, + NULL); + } + } + /* * Collect lists of all the available path orderings and * parameterizations for all the children. We use these as a @@ -1491,11 +1548,13 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * if we have zero or one live subpath due to constraint exclusion.) */ if (subpaths_valid) - add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0, - partitioned_rels)); + add_path(rel, (Path *) create_append_path(rel, subpaths, NIL, + NULL, 0, false, + partitioned_rels, -1)); /* - * Consider an append of partial unordered, unparameterized partial paths. + * Consider an append of unordered, unparameterized partial paths. Make + * it parallel-aware if possible. */ if (partial_subpaths_valid) { @@ -1503,12 +1562,7 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, ListCell *lc; int parallel_workers = 0; - /* - * Decide on the number of workers to request for this append path. - * For now, we just use the maximum value from among the members. It - * might be useful to use a higher number if the Append node were - * smart enough to spread out the workers, but it currently isn't. - */ + /* Find the highest number of workers requested for any subpath. */ foreach(lc, partial_subpaths) { Path *path = lfirst(lc); @@ -1517,9 +1571,78 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, } Assert(parallel_workers > 0); + /* + * If the use of parallel append is permitted, always request at least + * log2(# of children) paths. We assume it can be useful to have + * extra workers in this case because they will be spread out across + * the children. The precise formula is just a guess, but we don't + * want to end up with a radically different answer for a table with N + * partitions vs. an unpartitioned table with the same data, so the + * use of some kind of log-scaling here seems to make some sense. + */ + if (enable_parallel_append) + { + parallel_workers = Max(parallel_workers, + fls(list_length(live_childrels))); + parallel_workers = Min(parallel_workers, + max_parallel_workers_per_gather); + } + Assert(parallel_workers > 0); + /* Generate a partial append path. */ - appendpath = create_append_path(rel, partial_subpaths, NULL, - parallel_workers, partitioned_rels); + appendpath = create_append_path(rel, NIL, partial_subpaths, NULL, + parallel_workers, + enable_parallel_append, + partitioned_rels, -1); + + /* + * Make sure any subsequent partial paths use the same row count + * estimate. + */ + partial_rows = appendpath->path.rows; + + /* Add the path. */ + add_partial_path(rel, (Path *) appendpath); + } + + /* + * Consider a parallel-aware append using a mix of partial and non-partial + * paths. (This only makes sense if there's at least one child which has + * a non-partial path that is substantially cheaper than any partial path; + * otherwise, we should use the append path added in the previous step.) + */ + if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL) + { + AppendPath *appendpath; + ListCell *lc; + int parallel_workers = 0; + + /* + * Find the highest number of workers requested for any partial + * subpath. + */ + foreach(lc, pa_partial_subpaths) + { + Path *path = lfirst(lc); + + parallel_workers = Max(parallel_workers, path->parallel_workers); + } + + /* + * Same formula here as above. It's even more important in this + * instance because the non-partial paths won't contribute anything to + * the planned number of parallel workers. + */ + parallel_workers = Max(parallel_workers, + fls(list_length(live_childrels))); + parallel_workers = Min(parallel_workers, + max_parallel_workers_per_gather); + Assert(parallel_workers > 0); + + appendpath = create_append_path(rel, pa_nonpartial_subpaths, + pa_partial_subpaths, + NULL, parallel_workers, true, + partitioned_rels, partial_rows); add_partial_path(rel, (Path *) appendpath); } @@ -1567,13 +1690,14 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, subpaths_valid = false; break; } - subpaths = accumulate_append_subpath(subpaths, subpath); + accumulate_append_subpath(subpath, &subpaths, NULL); } if (subpaths_valid) add_path(rel, (Path *) - create_append_path(rel, subpaths, required_outer, 0, - partitioned_rels)); + create_append_path(rel, subpaths, NIL, + required_outer, 0, false, + partitioned_rels, -1)); } } @@ -1657,10 +1781,10 @@ generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel, if (cheapest_startup != cheapest_total) startup_neq_total = true; - startup_subpaths = - accumulate_append_subpath(startup_subpaths, cheapest_startup); - total_subpaths = - accumulate_append_subpath(total_subpaths, cheapest_total); + accumulate_append_subpath(cheapest_startup, + &startup_subpaths, NULL); + accumulate_append_subpath(cheapest_total, + &total_subpaths, NULL); } /* ... and build the MergeAppend paths */ @@ -1756,7 +1880,7 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, /* * accumulate_append_subpath - * Add a subpath to the list being built for an Append or MergeAppend + * Add a subpath to the list being built for an Append or MergeAppend. * * It's possible that the child is itself an Append or MergeAppend path, in * which case we can "cut out the middleman" and just add its child paths to @@ -1767,26 +1891,53 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, * omitting a sort step, which seems fine: if the parent is to be an Append, * its result would be unsorted anyway, while if the parent is to be a * MergeAppend, there's no point in a separate sort on a child. + * its result would be unsorted anyway. + * + * Normally, either path is a partial path and subpaths is a list of partial + * paths, or else path is a non-partial plan and subpaths is a list of those. + * However, if path is a parallel-aware Append, then we add its partial path + * children to subpaths and the rest to special_subpaths. If the latter is + * NULL, we don't flatten the path at all (unless it contains only partial + * paths). */ -static List * -accumulate_append_subpath(List *subpaths, Path *path) +static void +accumulate_append_subpath(Path *path, List **subpaths, List **special_subpaths) { if (IsA(path, AppendPath)) { AppendPath *apath = (AppendPath *) path; - /* list_copy is important here to avoid sharing list substructure */ - return list_concat(subpaths, list_copy(apath->subpaths)); + if (!apath->path.parallel_aware || apath->first_partial_path == 0) + { + /* list_copy is important here to avoid sharing list substructure */ + *subpaths = list_concat(*subpaths, list_copy(apath->subpaths)); + return; + } + else if (special_subpaths != NULL) + { + List *new_special_subpaths; + + /* Split Parallel Append into partial and non-partial subpaths */ + *subpaths = list_concat(*subpaths, + list_copy_tail(apath->subpaths, + apath->first_partial_path)); + new_special_subpaths = + list_truncate(list_copy(apath->subpaths), + apath->first_partial_path); + *special_subpaths = list_concat(*special_subpaths, + new_special_subpaths); + } } else if (IsA(path, MergeAppendPath)) { MergeAppendPath *mpath = (MergeAppendPath *) path; /* list_copy is important here to avoid sharing list substructure */ - return list_concat(subpaths, list_copy(mpath->subpaths)); + *subpaths = list_concat(*subpaths, list_copy(mpath->subpaths)); + return; } - else - return lappend(subpaths, path); + + *subpaths = lappend(*subpaths, path); } /* @@ -1809,7 +1960,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel) rel->pathlist = NIL; rel->partial_pathlist = NIL; - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL, -1)); /* * We set the cheapest path immediately, to ensure that IS_DUMMY_REL() diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index d11bf19e30..877827dcb5 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -128,6 +128,7 @@ bool enable_mergejoin = true; bool enable_hashjoin = true; bool enable_gathermerge = true; bool enable_partition_wise_join = false; +bool enable_parallel_append = true; typedef struct { @@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root, Relids inner_relids, SpecialJoinInfo *sjinfo, List **restrictlist); +static Cost append_nonpartial_cost(List *subpaths, int numpaths, + int parallel_workers); static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); @@ -1741,6 +1744,167 @@ cost_sort(Path *path, PlannerInfo *root, path->total_cost = startup_cost + run_cost; } +/* + * append_nonpartial_cost + * Estimate the cost of the non-partial paths in a Parallel Append. + * The non-partial paths are assumed to be the first "numpaths" paths + * from the subpaths list, and to be in order of decreasing cost. + */ +static Cost +append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers) +{ + Cost *costarr; + int arrlen; + ListCell *l; + ListCell *cell; + int i; + int path_index; + int min_index; + int max_index; + + if (numpaths == 0) + return 0; + + /* + * Array length is number of workers or number of relevants paths, + * whichever is less. + */ + arrlen = Min(parallel_workers, numpaths); + costarr = (Cost *) palloc(sizeof(Cost) * arrlen); + + /* The first few paths will each be claimed by a different worker. */ + path_index = 0; + foreach(cell, subpaths) + { + Path *subpath = (Path *) lfirst(cell); + + if (path_index == arrlen) + break; + costarr[path_index++] = subpath->total_cost; + } + + /* + * Since subpaths are sorted by decreasing cost, the last one will have + * the minimum cost. + */ + min_index = arrlen - 1; + + /* + * For each of the remaining subpaths, add its cost to the array element + * with minimum cost. + */ + for_each_cell(l, cell) + { + Path *subpath = (Path *) lfirst(l); + int i; + + /* Consider only the non-partial paths */ + if (path_index++ == numpaths) + break; + + costarr[min_index] += subpath->total_cost; + + /* Update the new min cost array index */ + for (min_index = i = 0; i < arrlen; i++) + { + if (costarr[i] < costarr[min_index]) + min_index = i; + } + } + + /* Return the highest cost from the array */ + for (max_index = i = 0; i < arrlen; i++) + { + if (costarr[i] > costarr[max_index]) + max_index = i; + } + + return costarr[max_index]; +} + +/* + * cost_append + * Determines and returns the cost of an Append node. + * + * We charge nothing extra for the Append itself, which perhaps is too + * optimistic, but since it doesn't do any selection or projection, it is a + * pretty cheap node. + */ +void +cost_append(AppendPath *apath) +{ + ListCell *l; + + apath->path.startup_cost = 0; + apath->path.total_cost = 0; + + if (apath->subpaths == NIL) + return; + + if (!apath->path.parallel_aware) + { + Path *subpath = (Path *) linitial(apath->subpaths); + + /* + * Startup cost of non-parallel-aware Append is the startup cost of + * first subpath. + */ + apath->path.startup_cost = subpath->startup_cost; + + /* Compute rows and costs as sums of subplan rows and costs. */ + foreach(l, apath->subpaths) + { + Path *subpath = (Path *) lfirst(l); + + apath->path.rows += subpath->rows; + apath->path.total_cost += subpath->total_cost; + } + } + else /* parallel-aware */ + { + int i = 0; + double parallel_divisor = get_parallel_divisor(&apath->path); + + /* Calculate startup cost. */ + foreach(l, apath->subpaths) + { + Path *subpath = (Path *) lfirst(l); + + /* + * Append will start returning tuples when the child node having + * lowest startup cost is done setting up. We consider only the + * first few subplans that immediately get a worker assigned. + */ + if (i == 0) + apath->path.startup_cost = subpath->startup_cost; + else if (i < apath->path.parallel_workers) + apath->path.startup_cost = Min(apath->path.startup_cost, + subpath->startup_cost); + + /* + * Apply parallel divisor to non-partial subpaths. Also add the + * cost of partial paths to the total cost, but ignore non-partial + * paths for now. + */ + if (i < apath->first_partial_path) + apath->path.rows += subpath->rows / parallel_divisor; + else + { + apath->path.rows += subpath->rows; + apath->path.total_cost += subpath->total_cost; + } + + i++; + } + + /* Add cost for non-partial subpaths. */ + apath->path.total_cost += + append_nonpartial_cost(apath->subpaths, + apath->first_partial_path, + apath->path.parallel_workers); + } +} + /* * cost_merge_append * Determines and returns the cost of a MergeAppend node. diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index 453f25964a..5e03f8bc21 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel) rel->partial_pathlist = NIL; /* Set up the dummy path */ - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL, -1)); /* Set or update cheapest_total_path and related fields */ set_cheapest(rel); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index d4454779ee..f6c83d0477 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual Index scanrelid, char *enrname); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); -static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels); +static Append *make_append(List *appendplans, int first_partial_plan, + List *tlist, List *partitioned_rels); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -1059,7 +1060,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) * parent-rel Vars it'll be asked to emit. */ - plan = make_append(subplans, tlist, best_path->partitioned_rels); + plan = make_append(subplans, best_path->first_partial_path, + tlist, best_path->partitioned_rels); copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -5294,7 +5296,8 @@ make_foreignscan(List *qptlist, } static Append * -make_append(List *appendplans, List *tlist, List *partitioned_rels) +make_append(List *appendplans, int first_partial_plan, + List *tlist, List *partitioned_rels) { Append *node = makeNode(Append); Plan *plan = &node->plan; @@ -5305,6 +5308,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels) plan->righttree = NULL; node->partitioned_rels = partitioned_rels; node->appendplans = appendplans; + node->first_partial_plan = first_partial_plan; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index ef2eaeac0a..e8bc15c35d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3680,9 +3680,12 @@ create_grouping_paths(PlannerInfo *root, path = (Path *) create_append_path(grouped_rel, paths, + NIL, NULL, 0, - NIL); + false, + NIL, + -1); path->pathtarget = target; } else diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index f620243ab4..a24e8acfa6 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); - + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL, -1); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); @@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL, -1); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index bc0841bf0b..54126fbb6a 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -51,6 +51,8 @@ typedef enum #define STD_FUZZ_FACTOR 1.01 static List *translate_sub_tlist(List *tlist, int relid); +static int append_total_cost_compare(const void *a, const void *b); +static int append_startup_cost_compare(const void *a, const void *b); static List *reparameterize_pathlist_by_child(PlannerInfo *root, List *pathlist, RelOptInfo *child_rel); @@ -1208,44 +1210,50 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, * Note that we must handle subpaths = NIL, representing a dummy access path. */ AppendPath * -create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, - int parallel_workers, List *partitioned_rels) +create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels, double rows) { AppendPath *pathnode = makeNode(AppendPath); ListCell *l; + Assert(!parallel_aware || parallel_workers > 0); + pathnode->path.pathtype = T_Append; pathnode->path.parent = rel; pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = parallel_aware; pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* result is always considered unsorted */ pathnode->partitioned_rels = list_copy(partitioned_rels); - pathnode->subpaths = subpaths; /* - * We don't bother with inventing a cost_append(), but just do it here. - * - * Compute rows and costs as sums of subplan rows and costs. We charge - * nothing extra for the Append itself, which perhaps is too optimistic, - * but since it doesn't do any selection or projection, it is a pretty - * cheap node. + * For parallel append, non-partial paths are sorted by descending total + * costs. That way, the total time to finish all non-partial paths is + * minimized. Also, the partial paths are sorted by descending startup + * costs. There may be some paths that require to do startup work by a + * single worker. In such case, it's better for workers to choose the + * expensive ones first, whereas the leader should choose the cheapest + * startup plan. */ - pathnode->path.rows = 0; - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; + if (pathnode->path.parallel_aware) + { + subpaths = list_qsort(subpaths, append_total_cost_compare); + partial_subpaths = list_qsort(partial_subpaths, + append_startup_cost_compare); + } + pathnode->first_partial_path = list_length(subpaths); + pathnode->subpaths = list_concat(subpaths, partial_subpaths); + foreach(l, subpaths) { Path *subpath = (Path *) lfirst(l); - pathnode->path.rows += subpath->rows; - - if (l == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; pathnode->path.parallel_safe = pathnode->path.parallel_safe && subpath->parallel_safe; @@ -1253,9 +1261,53 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); } + Assert(!parallel_aware || pathnode->path.parallel_safe); + + cost_append(pathnode); + + /* If the caller provided a row estimate, override the computed value. */ + if (rows >= 0) + pathnode->path.rows = rows; + return pathnode; } +/* + * append_total_cost_compare + * list_qsort comparator for sorting append child paths by total_cost + */ +static int +append_total_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->total_cost > path2->total_cost) + return -1; + if (path1->total_cost < path2->total_cost) + return 1; + + return 0; +} + +/* + * append_startup_cost_compare + * list_qsort comparator for sorting append child paths by startup_cost + */ +static int +append_startup_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->startup_cost > path2->startup_cost) + return -1; + if (path1->startup_cost < path2->startup_cost) + return 1; + + return 0; +} + /* * create_merge_append_path * Creates a path corresponding to a MergeAppend plan, returning the diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index e5c3e86709..46f5c4277d 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -517,6 +517,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE, "session_typmod_table"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6dcd738be6..0f7a96d85c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"enable_parallel_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel append plans."), + NULL + }, + &enable_parallel_append, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 16ffbbeea8..842cf3601a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -296,6 +296,7 @@ #enable_material = on #enable_mergejoin = on #enable_nestloop = on +#enable_parallel_append = on #enable_seqscan = on #enable_sort = on #enable_tidscan = on diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index 4e38a1380e..d42d50614c 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -14,10 +14,15 @@ #ifndef NODEAPPEND_H #define NODEAPPEND_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern void ExecEndAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node); +extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt); #endif /* NODEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 084d59ef83..1a35c5c9ad 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -21,6 +21,7 @@ #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/spin.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" @@ -1000,13 +1001,22 @@ typedef struct ModifyTableState * whichplan which plan is being executed (0 .. n-1) * ---------------- */ -typedef struct AppendState + +struct AppendState; +typedef struct AppendState AppendState; +struct ParallelAppendState; +typedef struct ParallelAppendState ParallelAppendState; + +struct AppendState { PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; -} AppendState; + ParallelAppendState *as_pstate; /* parallel coordination info */ + Size pstate_len; /* size of parallel coordination info */ + bool (*choose_next_subplan) (AppendState *); +}; /* ---------------- * MergeAppendState information diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index 667d5e269c..711db92576 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -269,6 +269,9 @@ extern void list_free_deep(List *list); extern List *list_copy(const List *list); extern List *list_copy_tail(const List *list, int nskip); +typedef int (*list_qsort_comparator) (const void *a, const void *b); +extern List *list_qsort(const List *list, list_qsort_comparator cmp); + /* * To ease migration to the new list API, a set of compatibility * macros are provided that reduce the impact of the list API changes diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 9b38d44ba0..02fb366680 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -248,6 +248,7 @@ typedef struct Append /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *appendplans; + int first_partial_plan; } Append; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 51df8e9741..1108b6a0ea 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1255,6 +1255,9 @@ typedef struct CustomPath * AppendPath represents an Append plan, ie, successive execution of * several member plans. * + * For partial Append, 'subpaths' contains non-partial subpaths followed by + * partial subpaths. + * * Note: it is possible for "subpaths" to contain only one, or even no, * elements. These cases are optimized during create_append_plan. * In particular, an AppendPath with no subpaths is a "dummy" path that @@ -1266,6 +1269,9 @@ typedef struct AppendPath /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *subpaths; /* list of component Paths */ + + /* Index of first partial path in subpaths */ + int first_partial_path; } AppendPath; #define IS_DUMMY_PATH(p) \ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6c2317df39..5a1fbf97c3 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -68,6 +68,7 @@ extern bool enable_mergejoin; extern bool enable_hashjoin; extern bool enable_gathermerge; extern bool enable_partition_wise_join; +extern bool enable_parallel_append; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -106,6 +107,7 @@ extern void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples); +extern void cost_append(AppendPath *path); extern void cost_merge_append(Path *path, PlannerInfo *root, List *pathkeys, int n_streams, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index e9ed16ad32..99f65b44f2 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -14,6 +14,7 @@ #ifndef PATHNODE_H #define PATHNODE_H +#include "nodes/bitmapset.h" #include "nodes/relation.h" @@ -63,9 +64,11 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root, List *bitmapquals); extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, Relids required_outer); -extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths, - Relids required_outer, int parallel_workers, - List *partitioned_rels); +extern AppendPath *create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels, double rows); extern MergeAppendPath *create_merge_append_path(PlannerInfo *root, RelOptInfo *rel, List *subpaths, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 596fdadc63..460843d73e 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SESSION_RECORD_TABLE, LWTRANCHE_SESSION_TYPMOD_TABLE, LWTRANCHE_TBM, + LWTRANCHE_PARALLEL_APPEND, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out index fac7b62f9c..a79f891da7 100644 --- a/src/test/regress/expected/inherit.out +++ b/src/test/regress/expected/inherit.out @@ -1404,6 +1404,7 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallel_append = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; QUERY PLAN ------------------------------------------------------------------ @@ -1470,6 +1471,7 @@ select min(1-id) from matest0; (1 row) reset enable_seqscan; +reset enable_parallel_append; drop table matest0 cascade; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table matest1 diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index b748c98c91..62ed719ccc 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -11,8 +11,88 @@ set parallel_setup_cost=0; set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- Parallel Append with partial-subplans explain (costs off) - select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Append + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on c_star + -> Parallel Seq Scan on d_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +-- Parallel Append with both partial and non-partial subplans +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Append + -> Seq Scan on d_star + -> Seq Scan on c_star + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +-- Parallel Append with only non-partial subplans +alter table a_star set (parallel_workers = 0); +alter table b_star set (parallel_workers = 0); +alter table e_star set (parallel_workers = 0); +alter table f_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +-------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Append + -> Seq Scan on d_star + -> Seq Scan on f_star + -> Seq Scan on e_star + -> Seq Scan on b_star + -> Seq Scan on c_star + -> Seq Scan on a_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +-- Disable Parallel Append +alter table a_star reset (parallel_workers); +alter table b_star reset (parallel_workers); +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); +alter table e_star reset (parallel_workers); +alter table f_star reset (parallel_workers); +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; QUERY PLAN ----------------------------------------------------- Finalize Aggregate @@ -28,12 +108,13 @@ explain (costs off) -> Parallel Seq Scan on f_star (11 rows) -select count(*) from a_star; - count -------- - 50 +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 (1 row) +reset enable_parallel_append; -- test with leader participation disabled set parallel_leader_participation = off; explain (costs off) diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index cd1f7f301d..2b738aae7c 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%'; enable_material | on enable_mergejoin | on enable_nestloop | on + enable_parallel_append | on enable_partition_wise_join | off enable_seqscan | on enable_sort | on enable_tidscan | on -(13 rows) +(14 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql index c71febffc2..2e42ae115d 100644 --- a/src/test/regress/sql/inherit.sql +++ b/src/test/regress/sql/inherit.sql @@ -508,11 +508,13 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallel_append = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; select * from matest0 order by 1-id; explain (verbose, costs off) select min(1-id) from matest0; select min(1-id) from matest0; reset enable_seqscan; +reset enable_parallel_append; drop table matest0 cascade; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 00df92779c..d3f2028468 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -15,9 +15,38 @@ set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- Parallel Append with partial-subplans explain (costs off) - select count(*) from a_star; -select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; + +-- Parallel Append with both partial and non-partial subplans +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + +-- Parallel Append with only non-partial subplans +alter table a_star set (parallel_workers = 0); +alter table b_star set (parallel_workers = 0); +alter table e_star set (parallel_workers = 0); +alter table f_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; + +-- Disable Parallel Append +alter table a_star reset (parallel_workers); +alter table b_star reset (parallel_workers); +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); +alter table e_star reset (parallel_workers); +alter table f_star reset (parallel_workers); +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +reset enable_parallel_append; -- test with leader participation disabled set parallel_leader_participation = off;