From 3bd909b220930f21d6e15833a17947be749e7fde Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 30 Sep 2015 19:23:36 -0400 Subject: [PATCH] Add a Gather executor node. A Gather executor node runs any number of copies of a plan in an equal number of workers and merges all of the results into a single tuple stream. It can also run the plan itself, if the workers are unavailable or haven't started up yet. It is intended to work with the Partial Seq Scan node which will be added in future commits. It could also be used to implement parallel query of a different sort by itself, without help from Partial Seq Scan, if the single_copy mode is used. In that mode, a worker executes the plan, and the parallel leader does not, merely collecting the worker's results. So, a Gather node could be inserted into a plan to split the execution of that plan across two processes. Nested Gather nodes aren't currently supported, but we might want to add support for that in the future. There's nothing in the planner to actually generate Gather nodes yet, so it's not quite time to break out the champagne. But we're getting close. Amit Kapila. Some designs suggestions were provided by me, and I also reviewed the patch. Single-copy mode, documentation, and other minor changes also by me. --- doc/src/sgml/config.sgml | 46 +++ src/backend/commands/explain.c | 19 ++ src/backend/executor/Makefile | 4 +- src/backend/executor/execAmi.c | 8 + src/backend/executor/execMain.c | 3 + src/backend/executor/execParallel.c | 14 +- src/backend/executor/execProcnode.c | 46 +++ src/backend/executor/nodeGather.c | 299 ++++++++++++++++++ src/backend/nodes/copyfuncs.c | 25 ++ src/backend/nodes/outfuncs.c | 14 + src/backend/optimizer/path/costsize.c | 38 +++ src/backend/optimizer/plan/createplan.c | 57 ++++ src/backend/optimizer/plan/setrefs.c | 1 + src/backend/optimizer/plan/subselect.c | 1 + src/backend/optimizer/util/pathnode.c | 26 ++ src/backend/utils/misc/guc.c | 30 ++ src/backend/utils/misc/postgresql.conf.sample | 3 + src/include/executor/executor.h | 1 + src/include/executor/nodeGather.h | 25 ++ src/include/nodes/execnodes.h | 16 + src/include/nodes/nodes.h | 3 + src/include/nodes/plannodes.h | 11 + src/include/nodes/relation.h | 13 + src/include/optimizer/cost.h | 7 + src/include/optimizer/pathnode.h | 3 + src/tools/pgindent/typedefs.list | 4 + 26 files changed, 709 insertions(+), 8 deletions(-) create mode 100644 src/backend/executor/nodeGather.c create mode 100644 src/include/executor/nodeGather.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5c6d93fe07..696f286b76 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1928,6 +1928,22 @@ include_dir 'conf.d' + + + max_parallel_degree (integer) + + max_parallel_degree configuration parameter + + + + + Sets the maximum degree of parallelism for an individual parallel + operation. Note that the requested number of workers may not actually + be available at runtime. Parallel workers are taken from the pool + of processes established by . + + + @@ -3398,6 +3414,36 @@ include_dir 'conf.d' + + parallel_tuple_cost (floating point) + + parallel_tuple_cost configuration parameter + + + + + Sets the planner's estimate of the cost of transferring a tuple + from a parallel worker process to another process. + The default is 0.1. + + + + + + parallel_setup_cost (floating point) + + parallel_setup_cost configuration parameter + + + + + Sets the planner's estimate of the cost of launching parallel worker + processes. + The default is 1000. + + + + effective_cache_size (integer) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index f0d9e94eed..7fb8a1458d 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -853,6 +853,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_SampleScan: pname = sname = "Sample Scan"; break; + case T_Gather: + pname = sname = "Gather"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1276,6 +1279,22 @@ ExplainNode(PlanState *planstate, List *ancestors, show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); break; + case T_Gather: + { + Gather *gather = (Gather *) plan; + + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Number of Workers", + gather->num_workers, es); + if (gather->single_copy) + ExplainPropertyText("Single Copy", + gather->single_copy ? "true" : "false", + es); + } + break; case T_FunctionScan: if (es->verbose) { diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f5e1e1aefc..51edd4c5e7 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ execScan.o execTuples.o \ execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ nodeBitmapAnd.o nodeBitmapOr.o \ - nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \ - nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ + nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \ + nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ nodeLimit.o nodeLockRows.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 93e1e9a691..163650cecd 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -24,6 +24,7 @@ #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeFunctionscan.h" +#include "executor/nodeGather.h" #include "executor/nodeGroup.h" #include "executor/nodeGroup.h" #include "executor/nodeHash.h" @@ -160,6 +161,10 @@ ExecReScan(PlanState *node) ExecReScanSampleScan((SampleScanState *) node); break; + case T_GatherState: + ExecReScanGather((GatherState *) node); + break; + case T_IndexScanState: ExecReScanIndexScan((IndexScanState *) node); break; @@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node) /* Simplify life for tablesample methods by disallowing this */ return false; + case T_Gather: + return false; + case T_IndexScan: return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) && TargetListSupportsBackwardScan(node->targetlist); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 85ff46b802..37b7bbd413 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -347,6 +347,9 @@ standard_ExecutorRun(QueryDesc *queryDesc, direction, dest); + /* Allow nodes to release or shut down resources. */ + (void) ExecShutdownNode(queryDesc->planstate); + /* * shutdown tuple receiver, if we started it */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a409a9a571..e6930c1d51 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -71,7 +71,7 @@ typedef struct ExecParallelInitializeDSMContext } ExecParallelInitializeDSMContext; /* Helper functions that run in the parallel leader. */ -static char *ExecSerializePlan(Plan *plan, List *rangetable); +static char *ExecSerializePlan(Plan *plan, EState *estate); static bool ExecParallelEstimate(PlanState *node, ExecParallelEstimateContext *e); static bool ExecParallelInitializeDSM(PlanState *node, @@ -88,7 +88,7 @@ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); * Create a serialized representation of the plan to be sent to each worker. */ static char * -ExecSerializePlan(Plan *plan, List *rangetable) +ExecSerializePlan(Plan *plan, EState *estate) { PlannedStmt *pstmt; ListCell *tlist; @@ -125,13 +125,13 @@ ExecSerializePlan(Plan *plan, List *rangetable) pstmt->canSetTag = 1; pstmt->transientPlan = 0; pstmt->planTree = plan; - pstmt->rtable = rangetable; + pstmt->rtable = estate->es_range_table; pstmt->resultRelations = NIL; pstmt->utilityStmt = NULL; pstmt->subplans = NIL; pstmt->rewindPlanIDs = NULL; pstmt->rowMarks = NIL; - pstmt->nParamExec = 0; + pstmt->nParamExec = estate->es_plannedstmt->nParamExec; pstmt->relationOids = NIL; pstmt->invalItems = NIL; /* workers can't replan anyway... */ pstmt->hasRowSecurity = false; @@ -271,7 +271,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->planstate = planstate; /* Fix up and serialize plan to be sent to workers. */ - pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table); + pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ pcxt = CreateParallelContext(ParallelQueryMain, nworkers); @@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorStart(queryDesc, 0); ExecutorRun(queryDesc, ForwardScanDirection, 0L); ExecutorFinish(queryDesc); - ExecutorEnd(queryDesc); /* Report buffer usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); @@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecParallelReportInstrumentation(queryDesc->planstate, instrumentation); + /* Must do this after capturing instrumentation. */ + ExecutorEnd(queryDesc); + /* Cleanup. */ FreeQueryDesc(queryDesc); (*receiver->rDestroy) (receiver); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 03c2febc3e..5bc1d48942 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -100,6 +100,7 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodeGather.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSamplescan.h" @@ -113,6 +114,7 @@ #include "executor/nodeValuesscan.h" #include "executor/nodeWindowAgg.h" #include "executor/nodeWorktablescan.h" +#include "nodes/nodeFuncs.h" #include "miscadmin.h" @@ -307,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_Gather: + result = (PlanState *) ExecInitGather((Gather *) node, + estate, eflags); + break; + case T_Hash: result = (PlanState *) ExecInitHash((Hash *) node, estate, eflags); @@ -504,6 +511,10 @@ ExecProcNode(PlanState *node) result = ExecUnique((UniqueState *) node); break; + case T_GatherState: + result = ExecGather((GatherState *) node); + break; + case T_HashState: result = ExecHash((HashState *) node); break; @@ -658,6 +669,10 @@ ExecEndNode(PlanState *node) ExecEndSampleScan((SampleScanState *) node); break; + case T_GatherState: + ExecEndGather((GatherState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; @@ -769,3 +784,34 @@ ExecEndNode(PlanState *node) break; } } + +/* + * ExecShutdownNode + * + * Give execution nodes a chance to stop asynchronous resource consumption + * and release any resources still held. Currently, this is only used for + * parallel query, but we might want to extend it to other cases also (e.g. + * FDW). We might also want to call it sooner, as soon as it's evident that + * no more rows will be needed (e.g. when a Limit is filled) rather than only + * at the end of ExecutorRun. + */ +bool +ExecShutdownNode(PlanState *node) +{ + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_GatherState: + { + ExecShutdownGather((GatherState *) node); + return true; + } + break; + default: + break; + } + + return planstate_tree_walker(node, ExecShutdownNode, NULL); +} diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c new file mode 100644 index 0000000000..735dbaa222 --- /dev/null +++ b/src/backend/executor/nodeGather.c @@ -0,0 +1,299 @@ +/*------------------------------------------------------------------------- + * + * nodeGather.c + * Support routines for scanning a plan via multiple workers. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeGather.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relscan.h" +#include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/nodeGather.h" +#include "executor/nodeSubplan.h" +#include "executor/tqueue.h" +#include "utils/rel.h" + + +static TupleTableSlot *gather_getnext(GatherState *gatherstate); + + +/* ---------------------------------------------------------------- + * ExecInitGather + * ---------------------------------------------------------------- + */ +GatherState * +ExecInitGather(Gather *node, EState *estate, int eflags) +{ + GatherState *gatherstate; + + /* Gather node doesn't have innerPlan node. */ + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + gatherstate = makeNode(GatherState); + gatherstate->ps.plan = (Plan *) node; + gatherstate->ps.state = estate; + gatherstate->need_to_scan_workers = false; + gatherstate->need_to_scan_locally = !node->single_copy; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &gatherstate->ps); + + /* + * initialize child expressions + */ + gatherstate->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) gatherstate); + gatherstate->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) gatherstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &gatherstate->ps); + + /* + * now initialize outer plan + */ + outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags); + + + gatherstate->ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&gatherstate->ps); + ExecAssignProjectionInfo(&gatherstate->ps, NULL); + + return gatherstate; +} + +/* ---------------------------------------------------------------- + * ExecGather(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecGather(GatherState *node) +{ + int i; + TupleTableSlot *slot; + + /* + * Initialize the parallel context and workers on first execution. We do + * this on first execution rather than during node initialization, as it + * needs to allocate large dynamic segement, so it is better to do if it + * is really needed. + */ + if (!node->pei) + { + EState *estate = node->ps.state; + + /* Initialize the workers required to execute Gather node. */ + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + ((Gather *) (node->ps.plan))->num_workers); + + /* + * Register backend workers. If the required number of workers are not + * available then we perform the scan with available workers and if + * there are no more workers available, then the Gather node will just + * scan locally. + */ + LaunchParallelWorkers(node->pei->pcxt); + + node->funnel = CreateTupleQueueFunnel(); + + for (i = 0; i < node->pei->pcxt->nworkers; ++i) + { + if (node->pei->pcxt->worker[i].bgwhandle) + { + shm_mq_set_handle(node->pei->tqueue[i], + node->pei->pcxt->worker[i].bgwhandle); + RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]); + node->need_to_scan_workers = true; + } + } + + /* If no workers are available, we must always scan locally. */ + if (!node->need_to_scan_workers) + node->need_to_scan_locally = true; + } + + slot = gather_getnext(node); + + if (TupIsNull(slot)) + { + /* + * Destroy the parallel context once we complete fetching all the + * tuples. Otherwise, the DSM and workers will stick around for the + * lifetime of the entire statement. + */ + ExecShutdownGather(node); + } + return slot; +} + +/* ---------------------------------------------------------------- + * ExecEndGather + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndGather(GatherState *node) +{ + ExecShutdownGather(node); + ExecFreeExprContext(&node->ps); + ExecClearTuple(node->ps.ps_ResultTupleSlot); + ExecEndNode(outerPlanState(node)); +} + +/* + * gather_getnext + * + * Get the next tuple from shared memory queue. This function + * is reponsible for fetching tuples from all the queues associated + * with worker backends used in Gather node execution and if there is + * no data available from queues or no worker is available, it does + * fetch the data from local node. + */ +TupleTableSlot * +gather_getnext(GatherState *gatherstate) +{ + PlanState *outerPlan; + TupleTableSlot *outerTupleSlot; + TupleTableSlot *slot; + HeapTuple tup; + + /* + * We can use projection info of Gather for the tuples received from + * worker backends as currently for all cases worker backends sends the + * projected tuple as required by Gather node. + */ + slot = gatherstate->ps.ps_ProjInfo->pi_slot; + + while (gatherstate->need_to_scan_workers || + gatherstate->need_to_scan_locally) + { + if (gatherstate->need_to_scan_workers) + { + bool done = false; + + /* wait only if local scan is done */ + tup = TupleQueueFunnelNext(gatherstate->funnel, + gatherstate->need_to_scan_locally, + &done); + if (done) + gatherstate->need_to_scan_workers = false; + + if (HeapTupleIsValid(tup)) + { + ExecStoreTuple(tup, /* tuple to store */ + slot, /* slot to store in */ + InvalidBuffer, /* buffer associated with this + * tuple */ + true); /* pfree this pointer if not from heap */ + + return slot; + } + } + + if (gatherstate->need_to_scan_locally) + { + outerPlan = outerPlanState(gatherstate); + + outerTupleSlot = ExecProcNode(outerPlan); + + if (!TupIsNull(outerTupleSlot)) + return outerTupleSlot; + + gatherstate->need_to_scan_locally = false; + } + } + + return ExecClearTuple(slot); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGather + * + * Destroy the setup for parallel workers. Collect all the + * stats after workers are stopped, else some work done by + * workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGather(GatherState *node) +{ + Gather *gather; + + if (node->pei == NULL || node->pei->pcxt == NULL) + return; + + /* + * Ensure all workers have finished before destroying the parallel context + * to ensure a clean exit. + */ + if (node->funnel) + { + DestroyTupleQueueFunnel(node->funnel); + node->funnel = NULL; + } + + ExecParallelFinish(node->pei); + + /* destroy parallel context. */ + DestroyParallelContext(node->pei->pcxt); + node->pei->pcxt = NULL; + + gather = (Gather *) node->ps.plan; + node->need_to_scan_locally = !gather->single_copy; + node->need_to_scan_workers = false; +} + +/* ---------------------------------------------------------------- + * Join Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecReScanGather + * + * Re-initialize the workers and rescans a relation via them. + * ---------------------------------------------------------------- + */ +void +ExecReScanGather(GatherState *node) +{ + /* + * Re-initialize the parallel context and workers to perform rescan of + * relation. We want to gracefully shutdown all the workers so that they + * should be able to propagate any error or other information to master + * backend before dying. + */ + ExecShutdownGather(node); + + ExecReScan(node->ps.lefttree); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 4b4ddec4c2..88dc085870 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -314,6 +314,28 @@ _copyBitmapOr(const BitmapOr *from) return newnode; } +/* + * _copyGather + */ +static Gather * +_copyGather(const Gather *from) +{ + Gather *newnode = makeNode(Gather); + + /* + * copy node superclass fields + */ + CopyPlanFields((const Plan *) from, (Plan *) newnode); + + /* + * copy remainder of node + */ + COPY_SCALAR_FIELD(num_workers); + COPY_SCALAR_FIELD(single_copy); + + return newnode; +} + /* * CopyScanFields @@ -4235,6 +4257,9 @@ copyObject(const void *from) case T_Scan: retval = _copyScan(from); break; + case T_Gather: + retval = _copyGather(from); + break; case T_SeqScan: retval = _copySeqScan(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ee9c360345..4645ecb804 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -432,6 +432,17 @@ _outBitmapOr(StringInfo str, const BitmapOr *node) WRITE_NODE_FIELD(bitmapplans); } +static void +_outGather(StringInfo str, const Gather *node) +{ + WRITE_NODE_TYPE("GATHER"); + + _outPlanInfo(str, (const Plan *) node); + + WRITE_UINT_FIELD(num_workers); + WRITE_UINT_FIELD(single_copy); +} + static void _outScan(StringInfo str, const Scan *node) { @@ -3000,6 +3011,9 @@ _outNode(StringInfo str, const void *obj) case T_BitmapOr: _outBitmapOr(str, obj); break; + case T_Gather: + _outGather(str, obj); + break; case T_Scan: _outScan(str, obj); break; diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index d107d76a3c..1b61fd9d4e 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -11,6 +11,8 @@ * cpu_tuple_cost Cost of typical CPU time to process a tuple * cpu_index_tuple_cost Cost of typical CPU time to process an index tuple * cpu_operator_cost Cost of CPU time to execute an operator or function + * parallel_tuple_cost Cost of CPU time to pass a tuple from worker to master backend + * parallel_setup_cost Cost of setting up shared memory for parallelism * * We expect that the kernel will typically do some amount of read-ahead * optimization; this in conjunction with seek costs means that seq_page_cost @@ -102,11 +104,15 @@ double random_page_cost = DEFAULT_RANDOM_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; +double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST; +double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST; int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE; Cost disable_cost = 1.0e10; +int max_parallel_degree = 0; + bool enable_seqscan = true; bool enable_indexscan = true; bool enable_indexonlyscan = true; @@ -289,6 +295,38 @@ cost_samplescan(Path *path, PlannerInfo *root, path->total_cost = startup_cost + run_cost; } +/* + * cost_gather + * Determines and returns the cost of gather path. + * + * 'rel' is the relation to be operated upon + * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + */ +void +cost_gather(GatherPath *path, PlannerInfo *root, + RelOptInfo *rel, ParamPathInfo *param_info) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + + /* Mark the path with the correct row estimate */ + if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = rel->rows; + + startup_cost = path->subpath->startup_cost; + + run_cost = path->subpath->total_cost - path->subpath->startup_cost; + + /* Parallel setup and communication cost. */ + startup_cost += parallel_setup_cost; + run_cost += parallel_tuple_cost * rel->tuples; + + path->path.startup_cost = startup_cost; + path->path.total_cost = (startup_cost + run_cost); +} + /* * cost_index * Determines and returns the cost of scanning a relation using an index. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 404c6f593d..0ee7392bcc 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -60,6 +60,8 @@ static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Gather *create_gather_plan(PlannerInfo *root, + GatherPath *best_path); static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path, List *tlist, List *scan_clauses, bool indexonly); static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root, @@ -104,6 +106,8 @@ static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid, TableSampleClause *tsc); +static Gather *make_gather(List *qptlist, List *qpqual, + int nworkers, bool single_copy, Plan *subplan); static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid, Oid indexid, List *indexqual, List *indexqualorig, List *indexorderby, List *indexorderbyorig, @@ -273,6 +277,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path) plan = create_unique_plan(root, (UniquePath *) best_path); break; + case T_Gather: + plan = (Plan *) create_gather_plan(root, + (GatherPath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->pathtype); @@ -1101,6 +1109,34 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path) return plan; } +/* + * create_gather_plan + * + * Create a Gather plan for 'best_path' and (recursively) plans + * for its subpaths. + */ +static Gather * +create_gather_plan(PlannerInfo *root, GatherPath *best_path) +{ + Gather *gather_plan; + Plan *subplan; + + subplan = create_plan_recurse(root, best_path->subpath); + + gather_plan = make_gather(subplan->targetlist, + NIL, + best_path->num_workers, + best_path->single_copy, + subplan); + + copy_path_costsize(&gather_plan->plan, &best_path->path); + + /* use parallel mode for parallel plans. */ + root->glob->parallelModeNeeded = true; + + return gather_plan; +} + /***************************************************************************** * @@ -4735,6 +4771,27 @@ make_unique(Plan *lefttree, List *distinctList) return node; } +static Gather * +make_gather(List *qptlist, + List *qpqual, + int nworkers, + bool single_copy, + Plan *subplan) +{ + Gather *node = makeNode(Gather); + Plan *plan = &node->plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = subplan; + plan->righttree = NULL; + node->num_workers = nworkers; + node->single_copy = single_copy; + + return node; +} + /* * distinctList is a list of SortGroupClauses, identifying the targetlist * items that should be considered by the SetOp filter. The input path must diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 3c8169725a..b1cede2ef0 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -607,6 +607,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_Sort: case T_Unique: case T_SetOp: + case T_Gather: /* * These plan types don't actually bother to evaluate their diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index d0bc412c83..6b32f85d6c 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2584,6 +2584,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, case T_Material: case T_Sort: case T_Unique: + case T_Gather: case T_SetOp: case T_Group: break; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 4336ca1b78..1895a6894a 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1307,6 +1307,32 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, return pathnode; } +/* + * create_gather_path + * + * Creates a path corresponding to a gather scan, returning the + * pathnode. + */ +GatherPath * +create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, + Relids required_outer, int nworkers) +{ + GatherPath *pathnode = makeNode(GatherPath); + + pathnode->path.pathtype = T_Gather; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + required_outer); + pathnode->path.pathkeys = NIL; /* Gather has unordered result */ + + pathnode->subpath = subpath; + pathnode->num_workers = nworkers; + + cost_gather(pathnode, root, rel, pathnode->path.param_info); + + return pathnode; +} + /* * translate_sub_tlist - get subquery column numbers represented by tlist * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 17053aff68..7684bff79b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2534,6 +2534,16 @@ static struct config_int ConfigureNamesInt[] = check_autovacuum_max_workers, NULL, NULL }, + { + {"max_parallel_degree", PGC_SUSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Sets the maximum number of parallel processes per executor node."), + NULL + }, + &max_parallel_degree, + 0, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), @@ -2711,6 +2721,26 @@ static struct config_real ConfigureNamesReal[] = DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX, NULL, NULL, NULL }, + { + {"parallel_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "passing each tuple (row) from worker to master backend."), + NULL + }, + ¶llel_tuple_cost, + DEFAULT_PARALLEL_TUPLE_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, + { + {"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "starting up worker processes for parallel query."), + NULL + }, + ¶llel_setup_cost, + DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, { {"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 8c65287e30..b2adda9595 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -164,6 +164,7 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 +#max_parallel_degree = 0 # max number of worker processes per node #------------------------------------------------------------------------------ @@ -290,6 +291,8 @@ #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above +#parallel_tuple_cost = 0.1 # same scale as above +#parallel_setup_cost = 1000.0 # same scale as above #effective_cache_size = 4GB # - Genetic Query Optimizer - diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 226f905c3c..4f77692aa3 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -225,6 +225,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern TupleTableSlot *ExecProcNode(PlanState *node); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); +extern bool ExecShutdownNode(PlanState *node); /* * prototypes from functions in execQual.c diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h new file mode 100644 index 0000000000..9e5d8fc153 --- /dev/null +++ b/src/include/executor/nodeGather.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * nodeGather.h + * prototypes for nodeGather.c + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeGather.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEGATHER_H +#define NODEGATHER_H + +#include "nodes/execnodes.h" + +extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags); +extern TupleTableSlot *ExecGather(GatherState *node); +extern void ExecEndGather(GatherState *node); +extern void ExecShutdownGather(GatherState *node); +extern void ExecReScanGather(GatherState *node); + +#endif /* NODEGATHER_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4ae2f3e067..b6895f94c3 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1950,6 +1950,22 @@ typedef struct UniqueState MemoryContext tempContext; /* short-term context for comparisons */ } UniqueState; +/* ---------------- + * GatherState information + * + * Gather nodes launch 1 or more parallel workers, run a subplan + * in those workers, and collect the results. + * ---------------- + */ +typedef struct GatherState +{ + PlanState ps; /* its first field is NodeTag */ + struct ParallelExecutorInfo *pei; + struct TupleQueueFunnel *funnel; + bool need_to_scan_workers; + bool need_to_scan_locally; +} GatherState; + /* ---------------- * HashState information * ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 274480e2c9..94bdb7c9af 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -74,6 +74,7 @@ typedef enum NodeTag T_Agg, T_WindowAgg, T_Unique, + T_Gather, T_Hash, T_SetOp, T_LockRows, @@ -121,6 +122,7 @@ typedef enum NodeTag T_AggState, T_WindowAggState, T_UniqueState, + T_GatherState, T_HashState, T_SetOpState, T_LockRowsState, @@ -238,6 +240,7 @@ typedef enum NodeTag T_ResultPath, T_MaterialPath, T_UniquePath, + T_GatherPath, T_EquivalenceClass, T_EquivalenceMember, T_PathKey, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 1e2d2bbaa1..1f9213c09b 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -753,6 +753,17 @@ typedef struct Unique Oid *uniqOperators; /* equality operators to compare with */ } Unique; +/* ------------ + * gather node + * ------------ + */ +typedef struct Gather +{ + Plan plan; + int num_workers; + bool single_copy; +} Gather; + /* ---------------- * hash build node * diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 961b5d17cf..6cf2e24ce7 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1046,6 +1046,19 @@ typedef struct UniquePath List *uniq_exprs; /* expressions to be made unique */ } UniquePath; +/* + * GatherPath runs several copies of a plan in parallel and collects the + * results. The parallel leader may also execute the plan, unless the + * single_copy flag is set. + */ +typedef struct GatherPath +{ + Path path; + Path *subpath; /* path for each worker */ + int num_workers; /* number of workers sought to help */ + bool single_copy; /* path must not be executed >1x */ +} GatherPath; + /* * All join-type paths share these fields. */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index dd43e45d0c..25a730362a 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -26,6 +26,8 @@ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 +#define DEFAULT_PARALLEL_TUPLE_COST 0.1 +#define DEFAULT_PARALLEL_SETUP_COST 1000.0 #define DEFAULT_EFFECTIVE_CACHE_SIZE 524288 /* measured in pages */ @@ -48,8 +50,11 @@ extern PGDLLIMPORT double random_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; +extern PGDLLIMPORT double parallel_tuple_cost; +extern PGDLLIMPORT double parallel_setup_cost; extern PGDLLIMPORT int effective_cache_size; extern Cost disable_cost; +extern int max_parallel_degree; extern bool enable_seqscan; extern bool enable_indexscan; extern bool enable_indexonlyscan; @@ -144,6 +149,8 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, SpecialJoinInfo *sjinfo, SemiAntiJoinFactors *semifactors); +extern void cost_gather(GatherPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan); extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root); extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 161644c343..7a4940c7d2 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -69,6 +69,9 @@ extern ResultPath *create_result_path(List *quals); extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath); extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, SpecialJoinInfo *sjinfo); +extern GatherPath *create_gather_path(PlannerInfo *root, + RelOptInfo *rel, Path *subpath, Relids required_outer, + int nworkers); extern Path *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, List *pathkeys, Relids required_outer); extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0e149ea2f4..feb821b409 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -707,6 +707,9 @@ FunctionParameterMode FunctionScan FunctionScanPerFuncState FunctionScanState +Gather +GatherPath +GatherState FuzzyAttrMatchState GBT_NUMKEY GBT_NUMKEY_R @@ -1195,6 +1198,7 @@ OverrideSearchPath OverrideStackEntry PACE_HEADER PACL +ParallelExecutorInfo PATH PBOOL PCtxtHandle -- 2.40.0