From f0661c4e8c44c0ec7acd4ea7c82e85b265447398 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 11 Nov 2015 08:57:52 -0500 Subject: [PATCH] Make sequential scans parallel-aware. In addition, this path fills in a number of missing bits and pieces in the parallel infrastructure. Paths and plans now have a parallel_aware flag indicating whether whatever parallel-aware logic they have should be engaged. It is believed that we will need this flag for a number of path/plan types, not just sequential scans, which is why the flag is generic rather than part of the SeqScan structures specifically. Also, execParallel.c now gives parallel nodes a chance to initialize their PlanState nodes from the DSM during parallel worker startup. Amit Kapila, with a fair amount of adjustment by me. Review of previous patch versions by Haribabu Kommi and others. --- src/backend/commands/explain.c | 4 + src/backend/executor/execAmi.c | 9 ++ src/backend/executor/execParallel.c | 54 ++++++++-- src/backend/executor/nodeSeqscan.c | 136 ++++++++++++++++++------ src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/outfuncs.c | 2 + src/backend/nodes/readfuncs.c | 1 + src/backend/optimizer/path/allpaths.c | 2 +- src/backend/optimizer/path/costsize.c | 15 ++- src/backend/optimizer/plan/createplan.c | 44 ++++---- src/backend/optimizer/plan/planner.c | 2 +- src/backend/optimizer/util/pathnode.c | 29 ++++- src/include/executor/nodeSeqscan.h | 6 ++ src/include/nodes/execnodes.h | 12 ++- src/include/nodes/plannodes.h | 5 + src/include/nodes/relation.h | 1 + src/include/optimizer/cost.h | 2 +- src/include/optimizer/pathnode.h | 2 +- 18 files changed, 254 insertions(+), 73 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 7fb8a1458d..183d3d9bcb 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -984,6 +984,8 @@ ExplainNode(PlanState *planstate, List *ancestors, appendStringInfoString(es->str, "-> "); es->indent += 2; } + if (plan->parallel_aware) + appendStringInfoString(es->str, "Parallel "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1000,6 +1002,8 @@ ExplainNode(PlanState *planstate, List *ancestors, ExplainPropertyText("Subplan Name", plan_name, es); if (custom_name) ExplainPropertyText("Custom Plan Provider", custom_name, es); + if (plan->parallel_aware) + ExplainPropertyText("Parallel Aware", "true", es); } switch (nodeTag(plan)) diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 163650cecd..b969fc0803 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node) if (node == NULL) return false; + /* + * Parallel-aware nodes return a subset of the tuples in each worker, + * and in general we can't expect to have enough bookkeeping state to + * know which ones we returned in this worker as opposed to some other + * worker. + */ + if (node->parallel_aware) + return false; + switch (nodeTag(node)) { case T_Result: diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 99a9de3cdc..eae13c5647 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" @@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* Count this node. */ e->nnodes++; - /* - * XXX. Call estimators for parallel-aware nodes here, when we have - * some. - */ + /* Call estimators for parallel-aware nodes. */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanEstimate((SeqScanState *) planstate, + e->pcxt); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelEstimate, e); } @@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate, /* Count this node. */ d->nnodes++; - /* - * XXX. Call initializers for parallel-aware plan nodes, when we have - * some. - */ + /* Call initializers for parallel-aware plan nodes. */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanInitializeDSM((SeqScanState *) planstate, + d->pcxt); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d); } @@ -574,6 +587,30 @@ ExecParallelReportInstrumentation(PlanState *planstate, instrumentation); } +/* + * Initialize the PlanState and its descendents with the information + * retrieved from shared memory. This has to be done once the PlanState + * is allocated and initialized by executor; that is, after ExecutorStart(). + */ +static bool +ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) +{ + if (planstate == NULL) + return false; + + /* Call initializers for parallel-aware plan nodes. */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); + break; + default: + break; + } + + return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc); +} + /* * Main entrypoint for parallel query worker processes. * @@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Start up the executor, have it run the plan, and then shut it down. */ ExecutorStart(queryDesc, 0); + ExecParallelInitializeWorker(queryDesc->planstate, toc); ExecutorRun(queryDesc, ForwardScanDirection, 0L); ExecutorFinish(queryDesc); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 3cb81fccc3..b858f2f3af 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -19,6 +19,10 @@ * ExecInitSeqScan creates and initializes a seqscan node. * ExecEndSeqScan releases any storage allocated. * ExecReScanSeqScan rescans the relation + * + * ExecSeqScanEstimate estimates DSM space needed for parallel scan + * ExecSeqScanInitializeDSM initialize DSM for parallel scan + * ExecSeqScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" @@ -53,10 +57,22 @@ SeqNext(SeqScanState *node) /* * get information from the estate and scan state */ - scandesc = node->ss_currentScanDesc; - estate = node->ps.state; + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; direction = estate->es_direction; - slot = node->ss_ScanTupleSlot; + slot = node->ss.ss_ScanTupleSlot; + + if (scandesc == NULL) + { + /* + * We reach here if the scan is not parallel, or if we're executing + * a scan that was intended to be parallel serially. + */ + scandesc = heap_beginscan(node->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL); + node->ss.ss_currentScanDesc = scandesc; + } /* * get the next tuple from the table @@ -123,27 +139,19 @@ static void InitScanRelation(SeqScanState *node, EState *estate, int eflags) { Relation currentRelation; - HeapScanDesc currentScanDesc; /* * get the relation object id from the relid'th entry in the range table, * open that relation and acquire appropriate lock on it. */ currentRelation = ExecOpenScanRelation(estate, - ((SeqScan *) node->ps.plan)->scanrelid, + ((SeqScan *) node->ss.ps.plan)->scanrelid, eflags); - /* initialize a heapscan */ - currentScanDesc = heap_beginscan(currentRelation, - estate->es_snapshot, - 0, - NULL); - - node->ss_currentRelation = currentRelation; - node->ss_currentScanDesc = currentScanDesc; + node->ss.ss_currentRelation = currentRelation; /* and report the scan tuple slot's rowtype */ - ExecAssignScanType(node, RelationGetDescr(currentRelation)); + ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); } @@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) * create state structure */ scanstate = makeNode(SeqScanState); - scanstate->ps.plan = (Plan *) node; - scanstate->ps.state = estate; + scanstate->ss.ps.plan = (Plan *) node; + scanstate->ss.ps.state = estate; /* * Miscellaneous initialization * * create expression context for node */ - ExecAssignExprContext(estate, &scanstate->ps); + ExecAssignExprContext(estate, &scanstate->ss.ps); /* * initialize child expressions */ - scanstate->ps.targetlist = (List *) + scanstate->ss.ps.targetlist = (List *) ExecInitExpr((Expr *) node->plan.targetlist, (PlanState *) scanstate); - scanstate->ps.qual = (List *) + scanstate->ss.ps.qual = (List *) ExecInitExpr((Expr *) node->plan.qual, (PlanState *) scanstate); /* * tuple table initialization */ - ExecInitResultTupleSlot(estate, &scanstate->ps); - ExecInitScanTupleSlot(estate, scanstate); + ExecInitResultTupleSlot(estate, &scanstate->ss.ps); + ExecInitScanTupleSlot(estate, &scanstate->ss); /* * initialize scan relation */ InitScanRelation(scanstate, estate, eflags); - scanstate->ps.ps_TupFromTlist = false; + scanstate->ss.ps.ps_TupFromTlist = false; /* * Initialize result tuple type and projection info. */ - ExecAssignResultTypeFromTL(&scanstate->ps); - ExecAssignScanProjectionInfo(scanstate); + ExecAssignResultTypeFromTL(&scanstate->ss.ps); + ExecAssignScanProjectionInfo(&scanstate->ss); return scanstate; } @@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node) /* * get information from node */ - relation = node->ss_currentRelation; - scanDesc = node->ss_currentScanDesc; + relation = node->ss.ss_currentRelation; + scanDesc = node->ss.ss_currentScanDesc; /* * Free the exprcontext */ - ExecFreeExprContext(&node->ps); + ExecFreeExprContext(&node->ss.ps); /* * clean out the tuple table */ - ExecClearTuple(node->ps.ps_ResultTupleSlot); - ExecClearTuple(node->ss_ScanTupleSlot); + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); /* * close heap scan */ - heap_endscan(scanDesc); + if (scanDesc != NULL) + heap_endscan(scanDesc); /* * close the heap relation. @@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node) { HeapScanDesc scan; - scan = node->ss_currentScanDesc; + scan = node->ss.ss_currentScanDesc; - heap_rescan(scan, /* scan desc */ - NULL); /* new scan keys */ + if (scan != NULL) + heap_rescan(scan, /* scan desc */ + NULL); /* new scan keys */ ExecScanReScan((ScanState *) node); } + +/* ---------------------------------------------------------------- + * Parallel Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecSeqScanEstimate + * + * estimates the space required to serialize seqscan node. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanEstimate(SeqScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + + node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecSeqScanInitializeDSM + * + * Set up a parallel heap scan descriptor. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanInitializeDSM(SeqScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + ParallelHeapScanDesc pscan; + + pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); + heap_parallelscan_initialize(pscan, + node->ss.ss_currentRelation, + estate->es_snapshot); + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); + node->ss.ss_currentScanDesc = + heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); +} + +/* ---------------------------------------------------------------- + * ExecSeqScanInitializeWorker + * + * Copy relevant information from TOC into planstate. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) +{ + ParallelHeapScanDesc pscan; + + pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + node->ss.ss_currentScanDesc = + heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index c176ff978e..26264cbfab 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(total_cost); COPY_SCALAR_FIELD(plan_rows); COPY_SCALAR_FIELD(plan_width); + COPY_SCALAR_FIELD(parallel_aware); COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 3d3a7744b5..ab2fdc434e 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_FLOAT_FIELD(total_cost, "%.2f"); WRITE_FLOAT_FIELD(plan_rows, "%.0f"); WRITE_INT_FIELD(plan_width); + WRITE_BOOL_FIELD(parallel_aware); WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); @@ -1585,6 +1586,7 @@ _outPathInfo(StringInfo str, const Path *node) _outBitmapset(str, node->param_info->ppi_req_outer); else _outBitmapset(str, NULL); + WRITE_BOOL_FIELD(parallel_aware); WRITE_FLOAT_FIELD(rows, "%.0f"); WRITE_FLOAT_FIELD(startup_cost, "%.2f"); WRITE_FLOAT_FIELD(total_cost, "%.2f"); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 94ba6dc0b9..5e258c939f 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1412,6 +1412,7 @@ ReadCommonPlan(Plan *local_node) READ_FLOAT_FIELD(total_cost); READ_FLOAT_FIELD(plan_rows); READ_INT_FIELD(plan_width); + READ_BOOL_FIELD(parallel_aware); READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 8fc1cfd15f..47de4eeba8 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -475,7 +475,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) required_outer = rel->lateral_relids; /* Consider sequential scan */ - add_path(rel, create_seqscan_path(root, rel, required_outer)); + add_path(rel, create_seqscan_path(root, rel, required_outer, 0)); /* Consider index scans */ create_index_paths(root, rel); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 1b61fd9d4e..e604992f73 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -181,10 +181,13 @@ clamp_row_est(double nrows) * * 'baserel' is the relation to be scanned * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + * 'nworkers' are the number of workers among which the work will be + * distributed if the scan is parallel scan */ void cost_seqscan(Path *path, PlannerInfo *root, - RelOptInfo *baserel, ParamPathInfo *param_info) + RelOptInfo *baserel, ParamPathInfo *param_info, + int nworkers) { Cost startup_cost = 0; Cost run_cost = 0; @@ -222,6 +225,16 @@ cost_seqscan(Path *path, PlannerInfo *root, cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple; run_cost += cpu_per_tuple * baserel->tuples; + /* + * Primitive parallel cost model. Assume the leader will do half as much + * work as a regular worker, because it will also need to read the tuples + * returned by the workers when they percolate up to the gather ndoe. + * This is almost certainly not exactly the right way to model this, so + * this will probably need to be changed at some point... + */ + if (nworkers > 0) + run_cost = run_cost / (nworkers + 0.5); + path->startup_cost = startup_cost; path->total_cost = startup_cost + run_cost; } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index e70a337328..411b36c418 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -101,7 +101,7 @@ static List *fix_indexorderby_references(PlannerInfo *root, IndexPath *index_pat static Node *fix_indexqual_operand(Node *node, IndexOptInfo *index, int indexcol); static List *get_switched_clauses(List *clauses, Relids outerrelids); static List *order_qual_clauses(PlannerInfo *root, List *clauses); -static void copy_path_costsize(Plan *dest, Path *src); +static void copy_generic_path_info(Plan *dest, Path *src); static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid, @@ -779,7 +779,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path) * prepare_sort_from_pathkeys on it before we do so on the individual * child plans, to make cross-checking the sort info easier. */ - copy_path_costsize(plan, (Path *) best_path); + copy_generic_path_info(plan, (Path *) best_path); plan->targetlist = tlist; plan->qual = NIL; plan->lefttree = NULL; @@ -901,7 +901,7 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path) plan = make_material(subplan); - copy_path_costsize(&plan->plan, (Path *) best_path); + copy_generic_path_info(&plan->plan, (Path *) best_path); return plan; } @@ -1129,7 +1129,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) best_path->single_copy, subplan); - copy_path_costsize(&gather_plan->plan, &best_path->path); + copy_generic_path_info(&gather_plan->plan, &best_path->path); /* use parallel mode for parallel plans. */ root->glob->parallelModeNeeded = true; @@ -1178,7 +1178,7 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path, scan_clauses, scan_relid); - copy_path_costsize(&scan_plan->plan, best_path); + copy_generic_path_info(&scan_plan->plan, best_path); return scan_plan; } @@ -1224,7 +1224,7 @@ create_samplescan_plan(PlannerInfo *root, Path *best_path, scan_relid, tsc); - copy_path_costsize(&scan_plan->scan.plan, best_path); + copy_generic_path_info(&scan_plan->scan.plan, best_path); return scan_plan; } @@ -1422,7 +1422,7 @@ create_indexscan_plan(PlannerInfo *root, indexorderbyops, best_path->indexscandir); - copy_path_costsize(&scan_plan->plan, &best_path->path); + copy_generic_path_info(&scan_plan->plan, &best_path->path); return scan_plan; } @@ -1538,7 +1538,7 @@ create_bitmap_scan_plan(PlannerInfo *root, bitmapqualorig, baserelid); - copy_path_costsize(&scan_plan->scan.plan, &best_path->path); + copy_generic_path_info(&scan_plan->scan.plan, &best_path->path); return scan_plan; } @@ -1795,7 +1795,7 @@ create_tidscan_plan(PlannerInfo *root, TidPath *best_path, scan_relid, tidquals); - copy_path_costsize(&scan_plan->scan.plan, &best_path->path); + copy_generic_path_info(&scan_plan->scan.plan, &best_path->path); return scan_plan; } @@ -1836,7 +1836,7 @@ create_subqueryscan_plan(PlannerInfo *root, Path *best_path, scan_relid, best_path->parent->subplan); - copy_path_costsize(&scan_plan->scan.plan, best_path); + copy_generic_path_info(&scan_plan->scan.plan, best_path); return scan_plan; } @@ -1879,7 +1879,7 @@ create_functionscan_plan(PlannerInfo *root, Path *best_path, scan_plan = make_functionscan(tlist, scan_clauses, scan_relid, functions, rte->funcordinality); - copy_path_costsize(&scan_plan->scan.plan, best_path); + copy_generic_path_info(&scan_plan->scan.plan, best_path); return scan_plan; } @@ -1923,7 +1923,7 @@ create_valuesscan_plan(PlannerInfo *root, Path *best_path, scan_plan = make_valuesscan(tlist, scan_clauses, scan_relid, values_lists); - copy_path_costsize(&scan_plan->scan.plan, best_path); + copy_generic_path_info(&scan_plan->scan.plan, best_path); return scan_plan; } @@ -2016,7 +2016,7 @@ create_ctescan_plan(PlannerInfo *root, Path *best_path, scan_plan = make_ctescan(tlist, scan_clauses, scan_relid, plan_id, cte_param_id); - copy_path_costsize(&scan_plan->scan.plan, best_path); + copy_generic_path_info(&scan_plan->scan.plan, best_path); return scan_plan; } @@ -2076,7 +2076,7 @@ create_worktablescan_plan(PlannerInfo *root, Path *best_path, scan_plan = make_worktablescan(tlist, scan_clauses, scan_relid, cteroot->wt_param_id); - copy_path_costsize(&scan_plan->scan.plan, best_path); + copy_generic_path_info(&scan_plan->scan.plan, best_path); return scan_plan; } @@ -2132,7 +2132,7 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path, tlist, scan_clauses); /* Copy cost data from Path to Plan; no need to make FDW do this */ - copy_path_costsize(&scan_plan->scan.plan, &best_path->path); + copy_generic_path_info(&scan_plan->scan.plan, &best_path->path); /* Copy foreign server OID; likewise, no need to make FDW do this */ scan_plan->fs_server = rel->serverid; @@ -2238,7 +2238,7 @@ create_customscan_plan(PlannerInfo *root, CustomPath *best_path, * Copy cost data from Path to Plan; no need to make custom-plan providers * do this */ - copy_path_costsize(&cplan->scan.plan, &best_path->path); + copy_generic_path_info(&cplan->scan.plan, &best_path->path); /* Likewise, copy the relids that are represented by this custom scan */ cplan->custom_relids = best_path->path.parent->relids; @@ -2355,7 +2355,7 @@ create_nestloop_plan(PlannerInfo *root, inner_plan, best_path->jointype); - copy_path_costsize(&join_plan->join.plan, &best_path->path); + copy_generic_path_info(&join_plan->join.plan, &best_path->path); return join_plan; } @@ -2650,7 +2650,7 @@ create_mergejoin_plan(PlannerInfo *root, best_path->jpath.jointype); /* Costs of sort and material steps are included in path cost already */ - copy_path_costsize(&join_plan->join.plan, &best_path->jpath.path); + copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path); return join_plan; } @@ -2775,7 +2775,7 @@ create_hashjoin_plan(PlannerInfo *root, (Plan *) hash_plan, best_path->jpath.jointype); - copy_path_costsize(&join_plan->join.plan, &best_path->jpath.path); + copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path); return join_plan; } @@ -3411,9 +3411,11 @@ order_qual_clauses(PlannerInfo *root, List *clauses) /* * Copy cost and size info from a Path node to the Plan node created from it. * The executor usually won't use this info, but it's needed by EXPLAIN. + * + * Also copy the parallel-aware flag, which the executor will use. */ static void -copy_path_costsize(Plan *dest, Path *src) +copy_generic_path_info(Plan *dest, Path *src) { if (src) { @@ -3421,6 +3423,7 @@ copy_path_costsize(Plan *dest, Path *src) dest->total_cost = src->total_cost; dest->plan_rows = src->rows; dest->plan_width = src->parent->width; + dest->parallel_aware = src->parallel_aware; } else { @@ -3428,6 +3431,7 @@ copy_path_costsize(Plan *dest, Path *src) dest->total_cost = 0; dest->plan_rows = 0; dest->plan_width = 0; + dest->parallel_aware = false; } } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 536b55e493..fa1ab3a46c 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -4690,7 +4690,7 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) comparisonCost = 2.0 * (indexExprCost.startup + indexExprCost.per_tuple); /* Estimate the cost of seq scan + sort */ - seqScanPath = create_seqscan_path(root, rel, NULL); + seqScanPath = create_seqscan_path(root, rel, NULL, 0); cost_sort(&seqScanAndSortPath, root, NIL, seqScanPath->total_cost, rel->tuples, rel->width, comparisonCost, maintenance_work_mem, -1.0); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 1895a6894a..09c3244546 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -696,7 +696,8 @@ add_path_precheck(RelOptInfo *parent_rel, * pathnode. */ Path * -create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) +create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, + Relids required_outer, int nworkers) { Path *pathnode = makeNode(Path); @@ -704,9 +705,10 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = nworkers > 0 ? true : false; pathnode->pathkeys = NIL; /* seqscan has unordered result */ - cost_seqscan(pathnode, root, rel, pathnode->param_info); + cost_seqscan(pathnode, root, rel, pathnode->param_info, nworkers); return pathnode; } @@ -724,6 +726,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = false; pathnode->pathkeys = NIL; /* samplescan has unordered result */ cost_samplescan(pathnode, root, rel, pathnode->param_info); @@ -777,6 +780,7 @@ create_index_path(PlannerInfo *root, pathnode->path.parent = rel; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = pathkeys; /* Convert clauses to indexquals the executor can handle */ @@ -822,6 +826,7 @@ create_bitmap_heap_path(PlannerInfo *root, pathnode->path.parent = rel; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->bitmapqual = bitmapqual; @@ -847,6 +852,7 @@ create_bitmap_and_path(PlannerInfo *root, pathnode->path.pathtype = T_BitmapAnd; pathnode->path.parent = rel; pathnode->path.param_info = NULL; /* not used in bitmap trees */ + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->bitmapquals = bitmapquals; @@ -871,6 +877,7 @@ create_bitmap_or_path(PlannerInfo *root, pathnode->path.pathtype = T_BitmapOr; pathnode->path.parent = rel; pathnode->path.param_info = NULL; /* not used in bitmap trees */ + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->bitmapquals = bitmapquals; @@ -895,6 +902,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, pathnode->path.parent = rel; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->tidquals = tidquals; @@ -922,6 +930,7 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer) pathnode->path.parent = rel; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; /* result is always considered * unsorted */ pathnode->subpaths = subpaths; @@ -975,6 +984,7 @@ create_merge_append_path(PlannerInfo *root, pathnode->path.parent = rel; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = pathkeys; pathnode->subpaths = subpaths; @@ -1049,6 +1059,7 @@ create_result_path(List *quals) pathnode->path.pathtype = T_Result; pathnode->path.parent = NULL; pathnode->path.param_info = NULL; /* there are no other rels... */ + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; pathnode->quals = quals; @@ -1082,6 +1093,7 @@ create_material_path(RelOptInfo *rel, Path *subpath) pathnode->path.pathtype = T_Material; pathnode->path.parent = rel; pathnode->path.param_info = subpath->param_info; + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = subpath->pathkeys; pathnode->subpath = subpath; @@ -1142,6 +1154,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.pathtype = T_Unique; pathnode->path.parent = rel; pathnode->path.param_info = subpath->param_info; + pathnode->path.parallel_aware = false; /* * Assume the output is unsorted, since we don't necessarily have pathkeys @@ -1323,6 +1336,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.parent = rel; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = NIL; /* Gather has unordered result */ pathnode->subpath = subpath; @@ -1378,6 +1392,7 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = false; pathnode->pathkeys = pathkeys; cost_subqueryscan(pathnode, root, rel, pathnode->param_info); @@ -1400,6 +1415,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = false; pathnode->pathkeys = pathkeys; cost_functionscan(pathnode, root, rel, pathnode->param_info); @@ -1422,6 +1438,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = false; pathnode->pathkeys = NIL; /* result is always unordered */ cost_valuesscan(pathnode, root, rel, pathnode->param_info); @@ -1443,6 +1460,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = false; pathnode->pathkeys = NIL; /* XXX for now, result is always unordered */ cost_ctescan(pathnode, root, rel, pathnode->param_info); @@ -1465,6 +1483,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->parent = rel; pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->parallel_aware = false; pathnode->pathkeys = NIL; /* result is always unordered */ /* Cost is the same as for a regular CTE scan */ @@ -1496,6 +1515,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.parent = rel; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->path.parallel_aware = false; pathnode->path.rows = rows; pathnode->path.startup_cost = startup_cost; pathnode->path.total_cost = total_cost; @@ -1630,6 +1650,7 @@ create_nestloop_path(PlannerInfo *root, sjinfo, required_outer, &restrict_clauses); + pathnode->path.parallel_aware = false; pathnode->path.pathkeys = pathkeys; pathnode->jointype = jointype; pathnode->outerjoinpath = outer_path; @@ -1687,6 +1708,7 @@ create_mergejoin_path(PlannerInfo *root, sjinfo, required_outer, &restrict_clauses); + pathnode->jpath.path.parallel_aware = false; pathnode->jpath.path.pathkeys = pathkeys; pathnode->jpath.jointype = jointype; pathnode->jpath.outerjoinpath = outer_path; @@ -1743,6 +1765,7 @@ create_hashjoin_path(PlannerInfo *root, sjinfo, required_outer, &restrict_clauses); + pathnode->jpath.path.parallel_aware = false; /* * A hashjoin never has pathkeys, since its output ordering is @@ -1798,7 +1821,7 @@ reparameterize_path(PlannerInfo *root, Path *path, switch (path->pathtype) { case T_SeqScan: - return create_seqscan_path(root, rel, required_outer); + return create_seqscan_path(root, rel, required_outer, 0); case T_SampleScan: return (Path *) create_samplescan_path(root, rel, required_outer); case T_IndexScan: diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index 39d12a62fc..f8f9299b65 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -14,6 +14,7 @@ #ifndef NODESEQSCAN_H #define NODESEQSCAN_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags); @@ -21,4 +22,9 @@ extern TupleTableSlot *ExecSeqScan(SeqScanState *node); extern void ExecEndSeqScan(SeqScanState *node); extern void ExecReScanSeqScan(SeqScanState *node); +/* parallel scan support */ +extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); +extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); +extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); + #endif /* NODESEQSCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 58ec889b2f..eb3591a663 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1248,11 +1248,15 @@ typedef struct ScanState TupleTableSlot *ss_ScanTupleSlot; } ScanState; -/* - * SeqScan uses a bare ScanState as its state node, since it needs - * no additional fields. +/* ---------------- + * SeqScanState information + * ---------------- */ -typedef ScanState SeqScanState; +typedef struct SeqScanState +{ + ScanState ss; /* its first field is NodeTag */ + Size pscan_len; /* size of parallel heap scan descriptor */ +} SeqScanState; /* ---------------- * SampleScanState information diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 6b28c8e28f..292219db51 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -108,6 +108,11 @@ typedef struct Plan double plan_rows; /* number of rows plan is expected to emit */ int plan_width; /* average row width in bytes */ + /* + * information needed for parallel query + */ + bool parallel_aware; /* engage parallel-aware logic? */ + /* * Common structural data for all Plan types. */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 6cf2e24ce7..d7406cc614 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -753,6 +753,7 @@ typedef struct Path RelOptInfo *parent; /* the relation this path can build */ ParamPathInfo *param_info; /* parameterization info, or NULL if none */ + bool parallel_aware; /* engage parallel-aware logic? */ /* estimated size/costs for path (see costsize.c for more info) */ double rows; /* estimated number of result tuples */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 25a730362a..ac21a3a181 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -72,7 +72,7 @@ extern double clamp_row_est(double nrows); extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, - ParamPathInfo *param_info); + ParamPathInfo *param_info, int nworkers); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_index(IndexPath *path, PlannerInfo *root, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 7a4940c7d2..f28b4e2b06 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -31,7 +31,7 @@ extern bool add_path_precheck(RelOptInfo *parent_rel, List *pathkeys, Relids required_outer); extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, - Relids required_outer); + Relids required_outer, int nworkers); extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); extern IndexPath *create_index_path(PlannerInfo *root, -- 2.40.0