]> granicus.if.org Git - postgresql/commitdiff
Make sequential scans parallel-aware.
authorRobert Haas <rhaas@postgresql.org>
Wed, 11 Nov 2015 13:57:52 +0000 (08:57 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 11 Nov 2015 13:57:52 +0000 (08:57 -0500)
In addition, this path fills in a number of missing bits and pieces in
the parallel infrastructure.  Paths and plans now have a parallel_aware
flag indicating whether whatever parallel-aware logic they have should
be engaged.  It is believed that we will need this flag for a number of
path/plan types, not just sequential scans, which is why the flag is
generic rather than part of the SeqScan structures specifically.
Also, execParallel.c now gives parallel nodes a chance to initialize
their PlanState nodes from the DSM during parallel worker startup.

Amit Kapila, with a fair amount of adjustment by me.  Review of previous
patch versions by Haribabu Kommi and others.

18 files changed:
src/backend/commands/explain.c
src/backend/executor/execAmi.c
src/backend/executor/execParallel.c
src/backend/executor/nodeSeqscan.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/util/pathnode.c
src/include/executor/nodeSeqscan.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h
src/include/nodes/relation.h
src/include/optimizer/cost.h
src/include/optimizer/pathnode.h

index 7fb8a1458dfa427d87b296a6f5d8768d8aac5170..183d3d9bcb77af298986812f84bc4502214cb563 100644 (file)
@@ -984,6 +984,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
                        appendStringInfoString(es->str, "->  ");
                        es->indent += 2;
                }
+               if (plan->parallel_aware)
+                       appendStringInfoString(es->str, "Parallel ");
                appendStringInfoString(es->str, pname);
                es->indent++;
        }
@@ -1000,6 +1002,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
                        ExplainPropertyText("Subplan Name", plan_name, es);
                if (custom_name)
                        ExplainPropertyText("Custom Plan Provider", custom_name, es);
+               if (plan->parallel_aware)
+                       ExplainPropertyText("Parallel Aware", "true", es);
        }
 
        switch (nodeTag(plan))
index 163650cecd1cb748af692a7dfa74b20f0fad013c..b969fc080374860108aec808f3afef50e888a10f 100644 (file)
@@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node)
        if (node == NULL)
                return false;
 
+       /*
+        * Parallel-aware nodes return a subset of the tuples in each worker,
+        * and in general we can't expect to have enough bookkeeping state to
+        * know which ones we returned in this worker as opposed to some other
+        * worker.
+        */
+       if (node->parallel_aware)
+               return false;
+
        switch (nodeTag(node))
        {
                case T_Result:
index 99a9de3cdc397fde4dab1623cdb968508ff87445..eae13c5647752c79a42eff96406ed50b014edae5 100644 (file)
@@ -25,6 +25,7 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeSeqscan.h"
 #include "executor/tqueue.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/planmain.h"
@@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
        /* Count this node. */
        e->nnodes++;
 
-       /*
-        * XXX. Call estimators for parallel-aware nodes here, when we have
-        * some.
-        */
+       /* Call estimators for parallel-aware nodes. */
+       switch (nodeTag(planstate))
+       {
+               case T_SeqScanState:
+                       ExecSeqScanEstimate((SeqScanState *) planstate,
+                                                               e->pcxt);
+                       break;
+               default:
+                       break;
+       }
 
        return planstate_tree_walker(planstate, ExecParallelEstimate, e);
 }
@@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate,
        /* Count this node. */
        d->nnodes++;
 
-       /*
-        * XXX. Call initializers for parallel-aware plan nodes, when we have
-        * some.
-        */
+       /* Call initializers for parallel-aware plan nodes. */
+       switch (nodeTag(planstate))
+       {
+               case T_SeqScanState:
+                       ExecSeqScanInitializeDSM((SeqScanState *) planstate,
+                                                                        d->pcxt);
+                       break;
+               default:
+                       break;
+       }
 
        return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
 }
@@ -574,6 +587,30 @@ ExecParallelReportInstrumentation(PlanState *planstate,
                                                                 instrumentation);
 }
 
+/*
+ * Initialize the PlanState and its descendents with the information
+ * retrieved from shared memory.  This has to be done once the PlanState
+ * is allocated and initialized by executor; that is, after ExecutorStart().
+ */
+static bool
+ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
+{
+       if (planstate == NULL)
+               return false;
+
+       /* Call initializers for parallel-aware plan nodes. */
+       switch (nodeTag(planstate))
+       {
+               case T_SeqScanState:
+                       ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
+                       break;
+               default:
+                       break;
+       }
+
+       return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
+}
+
 /*
  * Main entrypoint for parallel query worker processes.
  *
@@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 
        /* Start up the executor, have it run the plan, and then shut it down. */
        ExecutorStart(queryDesc, 0);
+       ExecParallelInitializeWorker(queryDesc->planstate, toc);
        ExecutorRun(queryDesc, ForwardScanDirection, 0L);
        ExecutorFinish(queryDesc);
 
index 3cb81fccc30c6378f99c6dcf0c92d01efbfdf101..b858f2f3af89fbad0702f7ce9430ab0ebdd6a547 100644 (file)
  *             ExecInitSeqScan                 creates and initializes a seqscan node.
  *             ExecEndSeqScan                  releases any storage allocated.
  *             ExecReScanSeqScan               rescans the relation
+ *
+ *             ExecSeqScanEstimate             estimates DSM space needed for parallel scan
+ *             ExecSeqScanInitializeDSM initialize DSM for parallel scan
+ *             ExecSeqScanInitializeWorker attach to DSM info in parallel worker
  */
 #include "postgres.h"
 
@@ -53,10 +57,22 @@ SeqNext(SeqScanState *node)
        /*
         * get information from the estate and scan state
         */
-       scandesc = node->ss_currentScanDesc;
-       estate = node->ps.state;
+       scandesc = node->ss.ss_currentScanDesc;
+       estate = node->ss.ps.state;
        direction = estate->es_direction;
-       slot = node->ss_ScanTupleSlot;
+       slot = node->ss.ss_ScanTupleSlot;
+
+       if (scandesc == NULL)
+       {
+               /*
+                * We reach here if the scan is not parallel, or if we're executing
+                * a scan that was intended to be parallel serially.
+                */
+               scandesc = heap_beginscan(node->ss.ss_currentRelation,
+                                                                 estate->es_snapshot,
+                                                                 0, NULL);
+               node->ss.ss_currentScanDesc = scandesc;
+       }
 
        /*
         * get the next tuple from the table
@@ -123,27 +139,19 @@ static void
 InitScanRelation(SeqScanState *node, EState *estate, int eflags)
 {
        Relation        currentRelation;
-       HeapScanDesc currentScanDesc;
 
        /*
         * get the relation object id from the relid'th entry in the range table,
         * open that relation and acquire appropriate lock on it.
         */
        currentRelation = ExecOpenScanRelation(estate,
-                                                                         ((SeqScan *) node->ps.plan)->scanrelid,
+                                                                         ((SeqScan *) node->ss.ps.plan)->scanrelid,
                                                                                   eflags);
 
-       /* initialize a heapscan */
-       currentScanDesc = heap_beginscan(currentRelation,
-                                                                        estate->es_snapshot,
-                                                                        0,
-                                                                        NULL);
-
-       node->ss_currentRelation = currentRelation;
-       node->ss_currentScanDesc = currentScanDesc;
+       node->ss.ss_currentRelation = currentRelation;
 
        /* and report the scan tuple slot's rowtype */
-       ExecAssignScanType(node, RelationGetDescr(currentRelation));
+       ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
 }
 
 
@@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
         * create state structure
         */
        scanstate = makeNode(SeqScanState);
-       scanstate->ps.plan = (Plan *) node;
-       scanstate->ps.state = estate;
+       scanstate->ss.ps.plan = (Plan *) node;
+       scanstate->ss.ps.state = estate;
 
        /*
         * Miscellaneous initialization
         *
         * create expression context for node
         */
-       ExecAssignExprContext(estate, &scanstate->ps);
+       ExecAssignExprContext(estate, &scanstate->ss.ps);
 
        /*
         * initialize child expressions
         */
-       scanstate->ps.targetlist = (List *)
+       scanstate->ss.ps.targetlist = (List *)
                ExecInitExpr((Expr *) node->plan.targetlist,
                                         (PlanState *) scanstate);
-       scanstate->ps.qual = (List *)
+       scanstate->ss.ps.qual = (List *)
                ExecInitExpr((Expr *) node->plan.qual,
                                         (PlanState *) scanstate);
 
        /*
         * tuple table initialization
         */
-       ExecInitResultTupleSlot(estate, &scanstate->ps);
-       ExecInitScanTupleSlot(estate, scanstate);
+       ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
+       ExecInitScanTupleSlot(estate, &scanstate->ss);
 
        /*
         * initialize scan relation
         */
        InitScanRelation(scanstate, estate, eflags);
 
-       scanstate->ps.ps_TupFromTlist = false;
+       scanstate->ss.ps.ps_TupFromTlist = false;
 
        /*
         * Initialize result tuple type and projection info.
         */
-       ExecAssignResultTypeFromTL(&scanstate->ps);
-       ExecAssignScanProjectionInfo(scanstate);
+       ExecAssignResultTypeFromTL(&scanstate->ss.ps);
+       ExecAssignScanProjectionInfo(&scanstate->ss);
 
        return scanstate;
 }
@@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node)
        /*
         * get information from node
         */
-       relation = node->ss_currentRelation;
-       scanDesc = node->ss_currentScanDesc;
+       relation = node->ss.ss_currentRelation;
+       scanDesc = node->ss.ss_currentScanDesc;
 
        /*
         * Free the exprcontext
         */
-       ExecFreeExprContext(&node->ps);
+       ExecFreeExprContext(&node->ss.ps);
 
        /*
         * clean out the tuple table
         */
-       ExecClearTuple(node->ps.ps_ResultTupleSlot);
-       ExecClearTuple(node->ss_ScanTupleSlot);
+       ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+       ExecClearTuple(node->ss.ss_ScanTupleSlot);
 
        /*
         * close heap scan
         */
-       heap_endscan(scanDesc);
+       if (scanDesc != NULL)
+               heap_endscan(scanDesc);
 
        /*
         * close the heap relation.
@@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node)
 {
        HeapScanDesc scan;
 
-       scan = node->ss_currentScanDesc;
+       scan = node->ss.ss_currentScanDesc;
 
-       heap_rescan(scan,                       /* scan desc */
-                               NULL);                  /* new scan keys */
+       if (scan != NULL)
+               heap_rescan(scan,                       /* scan desc */
+                                       NULL);                  /* new scan keys */
 
        ExecScanReScan((ScanState *) node);
 }
+
+/* ----------------------------------------------------------------
+ *                                             Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *             ExecSeqScanEstimate
+ *
+ *             estimates the space required to serialize seqscan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanEstimate(SeqScanState *node,
+                                       ParallelContext *pcxt)
+{
+       EState     *estate = node->ss.ps.state;
+
+       node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot);
+       shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecSeqScanInitializeDSM
+ *
+ *             Set up a parallel heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanInitializeDSM(SeqScanState *node,
+                                                ParallelContext *pcxt)
+{
+       EState     *estate = node->ss.ps.state;
+       ParallelHeapScanDesc    pscan;
+
+       pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+       heap_parallelscan_initialize(pscan,
+                                                                node->ss.ss_currentRelation,
+                                                                estate->es_snapshot);
+       shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
+       node->ss.ss_currentScanDesc =
+               heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecSeqScanInitializeWorker
+ *
+ *             Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
+{
+       ParallelHeapScanDesc    pscan;
+
+       pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+       node->ss.ss_currentScanDesc =
+               heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+}
index c176ff978ea306f9542caab8e2618213436fd42b..26264cbfab49e0ba52ab7c1370585d9a64ef81b8 100644 (file)
@@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
        COPY_SCALAR_FIELD(total_cost);
        COPY_SCALAR_FIELD(plan_rows);
        COPY_SCALAR_FIELD(plan_width);
+       COPY_SCALAR_FIELD(parallel_aware);
        COPY_SCALAR_FIELD(plan_node_id);
        COPY_NODE_FIELD(targetlist);
        COPY_NODE_FIELD(qual);
index 3d3a7744b528e5aa5cfd1e68e4682df8fced0e69..ab2fdc434e0203f82f1887d9a005ea4467e41c82 100644 (file)
@@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
        WRITE_FLOAT_FIELD(total_cost, "%.2f");
        WRITE_FLOAT_FIELD(plan_rows, "%.0f");
        WRITE_INT_FIELD(plan_width);
+       WRITE_BOOL_FIELD(parallel_aware);
        WRITE_INT_FIELD(plan_node_id);
        WRITE_NODE_FIELD(targetlist);
        WRITE_NODE_FIELD(qual);
@@ -1585,6 +1586,7 @@ _outPathInfo(StringInfo str, const Path *node)
                _outBitmapset(str, node->param_info->ppi_req_outer);
        else
                _outBitmapset(str, NULL);
+       WRITE_BOOL_FIELD(parallel_aware);
        WRITE_FLOAT_FIELD(rows, "%.0f");
        WRITE_FLOAT_FIELD(startup_cost, "%.2f");
        WRITE_FLOAT_FIELD(total_cost, "%.2f");
index 94ba6dc0b9b1efa7c3286fb052c7dfb05c31fa3e..5e258c939f2c484d7f1579478da175a62cd65c9e 100644 (file)
@@ -1412,6 +1412,7 @@ ReadCommonPlan(Plan *local_node)
        READ_FLOAT_FIELD(total_cost);
        READ_FLOAT_FIELD(plan_rows);
        READ_INT_FIELD(plan_width);
+       READ_BOOL_FIELD(parallel_aware);
        READ_INT_FIELD(plan_node_id);
        READ_NODE_FIELD(targetlist);
        READ_NODE_FIELD(qual);
index 8fc1cfd15f5330a44c537eec49e5eecc93dac27f..47de4eeba8c1fef77cf154b263eb6616cc442622 100644 (file)
@@ -475,7 +475,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
        required_outer = rel->lateral_relids;
 
        /* Consider sequential scan */
-       add_path(rel, create_seqscan_path(root, rel, required_outer));
+       add_path(rel, create_seqscan_path(root, rel, required_outer, 0));
 
        /* Consider index scans */
        create_index_paths(root, rel);
index 1b61fd9d4eaa7206e3b6a1b46dfa022b2a2c16d6..e604992f73429b8ebb2e8371f7a2af9076346973 100644 (file)
@@ -181,10 +181,13 @@ clamp_row_est(double nrows)
  *
  * 'baserel' is the relation to be scanned
  * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ * 'nworkers' are the number of workers among which the work will be
+ *                     distributed if the scan is parallel scan
  */
 void
 cost_seqscan(Path *path, PlannerInfo *root,
-                        RelOptInfo *baserel, ParamPathInfo *param_info)
+                        RelOptInfo *baserel, ParamPathInfo *param_info,
+                        int nworkers)
 {
        Cost            startup_cost = 0;
        Cost            run_cost = 0;
@@ -222,6 +225,16 @@ cost_seqscan(Path *path, PlannerInfo *root,
        cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
        run_cost += cpu_per_tuple * baserel->tuples;
 
+       /*
+        * Primitive parallel cost model.  Assume the leader will do half as much
+        * work as a regular worker, because it will also need to read the tuples
+        * returned by the workers when they percolate up to the gather ndoe.
+        * This is almost certainly not exactly the right way to model this, so
+        * this will probably need to be changed at some point...
+        */
+       if (nworkers > 0)
+               run_cost = run_cost / (nworkers + 0.5);
+
        path->startup_cost = startup_cost;
        path->total_cost = startup_cost + run_cost;
 }
index e70a337328ebfef7b17299aeac2475b6c44f7b27..411b36c418ed2cf599b2c556a0534855ea30bf99 100644 (file)
@@ -101,7 +101,7 @@ static List *fix_indexorderby_references(PlannerInfo *root, IndexPath *index_pat
 static Node *fix_indexqual_operand(Node *node, IndexOptInfo *index, int indexcol);
 static List *get_switched_clauses(List *clauses, Relids outerrelids);
 static List *order_qual_clauses(PlannerInfo *root, List *clauses);
-static void copy_path_costsize(Plan *dest, Path *src);
+static void copy_generic_path_info(Plan *dest, Path *src);
 static void copy_plan_costsize(Plan *dest, Plan *src);
 static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
 static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
@@ -779,7 +779,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
         * prepare_sort_from_pathkeys on it before we do so on the individual
         * child plans, to make cross-checking the sort info easier.
         */
-       copy_path_costsize(plan, (Path *) best_path);
+       copy_generic_path_info(plan, (Path *) best_path);
        plan->targetlist = tlist;
        plan->qual = NIL;
        plan->lefttree = NULL;
@@ -901,7 +901,7 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path)
 
        plan = make_material(subplan);
 
-       copy_path_costsize(&plan->plan, (Path *) best_path);
+       copy_generic_path_info(&plan->plan, (Path *) best_path);
 
        return plan;
 }
@@ -1129,7 +1129,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
                                                          best_path->single_copy,
                                                          subplan);
 
-       copy_path_costsize(&gather_plan->plan, &best_path->path);
+       copy_generic_path_info(&gather_plan->plan, &best_path->path);
 
        /* use parallel mode for parallel plans. */
        root->glob->parallelModeNeeded = true;
@@ -1178,7 +1178,7 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path,
                                                         scan_clauses,
                                                         scan_relid);
 
-       copy_path_costsize(&scan_plan->plan, best_path);
+       copy_generic_path_info(&scan_plan->plan, best_path);
 
        return scan_plan;
 }
@@ -1224,7 +1224,7 @@ create_samplescan_plan(PlannerInfo *root, Path *best_path,
                                                                scan_relid,
                                                                tsc);
 
-       copy_path_costsize(&scan_plan->scan.plan, best_path);
+       copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
        return scan_plan;
 }
@@ -1422,7 +1422,7 @@ create_indexscan_plan(PlannerInfo *root,
                                                                                        indexorderbyops,
                                                                                        best_path->indexscandir);
 
-       copy_path_costsize(&scan_plan->plan, &best_path->path);
+       copy_generic_path_info(&scan_plan->plan, &best_path->path);
 
        return scan_plan;
 }
@@ -1538,7 +1538,7 @@ create_bitmap_scan_plan(PlannerInfo *root,
                                                                         bitmapqualorig,
                                                                         baserelid);
 
-       copy_path_costsize(&scan_plan->scan.plan, &best_path->path);
+       copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
 
        return scan_plan;
 }
@@ -1795,7 +1795,7 @@ create_tidscan_plan(PlannerInfo *root, TidPath *best_path,
                                                         scan_relid,
                                                         tidquals);
 
-       copy_path_costsize(&scan_plan->scan.plan, &best_path->path);
+       copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
 
        return scan_plan;
 }
@@ -1836,7 +1836,7 @@ create_subqueryscan_plan(PlannerInfo *root, Path *best_path,
                                                                  scan_relid,
                                                                  best_path->parent->subplan);
 
-       copy_path_costsize(&scan_plan->scan.plan, best_path);
+       copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
        return scan_plan;
 }
@@ -1879,7 +1879,7 @@ create_functionscan_plan(PlannerInfo *root, Path *best_path,
        scan_plan = make_functionscan(tlist, scan_clauses, scan_relid,
                                                                  functions, rte->funcordinality);
 
-       copy_path_costsize(&scan_plan->scan.plan, best_path);
+       copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
        return scan_plan;
 }
@@ -1923,7 +1923,7 @@ create_valuesscan_plan(PlannerInfo *root, Path *best_path,
        scan_plan = make_valuesscan(tlist, scan_clauses, scan_relid,
                                                                values_lists);
 
-       copy_path_costsize(&scan_plan->scan.plan, best_path);
+       copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
        return scan_plan;
 }
@@ -2016,7 +2016,7 @@ create_ctescan_plan(PlannerInfo *root, Path *best_path,
        scan_plan = make_ctescan(tlist, scan_clauses, scan_relid,
                                                         plan_id, cte_param_id);
 
-       copy_path_costsize(&scan_plan->scan.plan, best_path);
+       copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
        return scan_plan;
 }
@@ -2076,7 +2076,7 @@ create_worktablescan_plan(PlannerInfo *root, Path *best_path,
        scan_plan = make_worktablescan(tlist, scan_clauses, scan_relid,
                                                                   cteroot->wt_param_id);
 
-       copy_path_costsize(&scan_plan->scan.plan, best_path);
+       copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
        return scan_plan;
 }
@@ -2132,7 +2132,7 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path,
                                                                                                tlist, scan_clauses);
 
        /* Copy cost data from Path to Plan; no need to make FDW do this */
-       copy_path_costsize(&scan_plan->scan.plan, &best_path->path);
+       copy_generic_path_info(&scan_plan->scan.plan, &best_path->path);
 
        /* Copy foreign server OID; likewise, no need to make FDW do this */
        scan_plan->fs_server = rel->serverid;
@@ -2238,7 +2238,7 @@ create_customscan_plan(PlannerInfo *root, CustomPath *best_path,
         * Copy cost data from Path to Plan; no need to make custom-plan providers
         * do this
         */
-       copy_path_costsize(&cplan->scan.plan, &best_path->path);
+       copy_generic_path_info(&cplan->scan.plan, &best_path->path);
 
        /* Likewise, copy the relids that are represented by this custom scan */
        cplan->custom_relids = best_path->path.parent->relids;
@@ -2355,7 +2355,7 @@ create_nestloop_plan(PlannerInfo *root,
                                                          inner_plan,
                                                          best_path->jointype);
 
-       copy_path_costsize(&join_plan->join.plan, &best_path->path);
+       copy_generic_path_info(&join_plan->join.plan, &best_path->path);
 
        return join_plan;
 }
@@ -2650,7 +2650,7 @@ create_mergejoin_plan(PlannerInfo *root,
                                                           best_path->jpath.jointype);
 
        /* Costs of sort and material steps are included in path cost already */
-       copy_path_costsize(&join_plan->join.plan, &best_path->jpath.path);
+       copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
 
        return join_plan;
 }
@@ -2775,7 +2775,7 @@ create_hashjoin_plan(PlannerInfo *root,
                                                          (Plan *) hash_plan,
                                                          best_path->jpath.jointype);
 
-       copy_path_costsize(&join_plan->join.plan, &best_path->jpath.path);
+       copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
 
        return join_plan;
 }
@@ -3411,9 +3411,11 @@ order_qual_clauses(PlannerInfo *root, List *clauses)
 /*
  * Copy cost and size info from a Path node to the Plan node created from it.
  * The executor usually won't use this info, but it's needed by EXPLAIN.
+ *
+ * Also copy the parallel-aware flag, which the executor will use.
  */
 static void
-copy_path_costsize(Plan *dest, Path *src)
+copy_generic_path_info(Plan *dest, Path *src)
 {
        if (src)
        {
@@ -3421,6 +3423,7 @@ copy_path_costsize(Plan *dest, Path *src)
                dest->total_cost = src->total_cost;
                dest->plan_rows = src->rows;
                dest->plan_width = src->parent->width;
+               dest->parallel_aware = src->parallel_aware;
        }
        else
        {
@@ -3428,6 +3431,7 @@ copy_path_costsize(Plan *dest, Path *src)
                dest->total_cost = 0;
                dest->plan_rows = 0;
                dest->plan_width = 0;
+               dest->parallel_aware = false;
        }
 }
 
index 536b55e4930557fc08ae93f7fc130aa44873ad8f..fa1ab3a46c3614daa46f49e6b6920113c9409847 100644 (file)
@@ -4690,7 +4690,7 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
        comparisonCost = 2.0 * (indexExprCost.startup + indexExprCost.per_tuple);
 
        /* Estimate the cost of seq scan + sort */
-       seqScanPath = create_seqscan_path(root, rel, NULL);
+       seqScanPath = create_seqscan_path(root, rel, NULL, 0);
        cost_sort(&seqScanAndSortPath, root, NIL,
                          seqScanPath->total_cost, rel->tuples, rel->width,
                          comparisonCost, maintenance_work_mem, -1.0);
index 1895a6894a37081b6c8278314cb42af113b8fcf0..09c32445462d497b5542a3cba0d752ab2aa7989d 100644 (file)
@@ -696,7 +696,8 @@ add_path_precheck(RelOptInfo *parent_rel,
  *       pathnode.
  */
 Path *
-create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
+create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
+                                       Relids required_outer, int nworkers)
 {
        Path       *pathnode = makeNode(Path);
 
@@ -704,9 +705,10 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = nworkers > 0 ? true : false;
        pathnode->pathkeys = NIL;       /* seqscan has unordered result */
 
-       cost_seqscan(pathnode, root, rel, pathnode->param_info);
+       cost_seqscan(pathnode, root, rel, pathnode->param_info, nworkers);
 
        return pathnode;
 }
@@ -724,6 +726,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = false;
        pathnode->pathkeys = NIL;       /* samplescan has unordered result */
 
        cost_samplescan(pathnode, root, rel, pathnode->param_info);
@@ -777,6 +780,7 @@ create_index_path(PlannerInfo *root,
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = pathkeys;
 
        /* Convert clauses to indexquals the executor can handle */
@@ -822,6 +826,7 @@ create_bitmap_heap_path(PlannerInfo *root,
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;          /* always unordered */
 
        pathnode->bitmapqual = bitmapqual;
@@ -847,6 +852,7 @@ create_bitmap_and_path(PlannerInfo *root,
        pathnode->path.pathtype = T_BitmapAnd;
        pathnode->path.parent = rel;
        pathnode->path.param_info = NULL;       /* not used in bitmap trees */
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;          /* always unordered */
 
        pathnode->bitmapquals = bitmapquals;
@@ -871,6 +877,7 @@ create_bitmap_or_path(PlannerInfo *root,
        pathnode->path.pathtype = T_BitmapOr;
        pathnode->path.parent = rel;
        pathnode->path.param_info = NULL;       /* not used in bitmap trees */
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;          /* always unordered */
 
        pathnode->bitmapquals = bitmapquals;
@@ -895,6 +902,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;          /* always unordered */
 
        pathnode->tidquals = tidquals;
@@ -922,6 +930,7 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_appendrel_parampathinfo(rel,
                                                                                                                        required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;          /* result is always considered
                                                                                 * unsorted */
        pathnode->subpaths = subpaths;
@@ -975,6 +984,7 @@ create_merge_append_path(PlannerInfo *root,
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_appendrel_parampathinfo(rel,
                                                                                                                        required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = pathkeys;
        pathnode->subpaths = subpaths;
 
@@ -1049,6 +1059,7 @@ create_result_path(List *quals)
        pathnode->path.pathtype = T_Result;
        pathnode->path.parent = NULL;
        pathnode->path.param_info = NULL;       /* there are no other rels... */
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;
        pathnode->quals = quals;
 
@@ -1082,6 +1093,7 @@ create_material_path(RelOptInfo *rel, Path *subpath)
        pathnode->path.pathtype = T_Material;
        pathnode->path.parent = rel;
        pathnode->path.param_info = subpath->param_info;
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = subpath->pathkeys;
 
        pathnode->subpath = subpath;
@@ -1142,6 +1154,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
        pathnode->path.pathtype = T_Unique;
        pathnode->path.parent = rel;
        pathnode->path.param_info = subpath->param_info;
+       pathnode->path.parallel_aware = false;
 
        /*
         * Assume the output is unsorted, since we don't necessarily have pathkeys
@@ -1323,6 +1336,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = NIL;          /* Gather has unordered result */
 
        pathnode->subpath = subpath;
@@ -1378,6 +1392,7 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = false;
        pathnode->pathkeys = pathkeys;
 
        cost_subqueryscan(pathnode, root, rel, pathnode->param_info);
@@ -1400,6 +1415,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = false;
        pathnode->pathkeys = pathkeys;
 
        cost_functionscan(pathnode, root, rel, pathnode->param_info);
@@ -1422,6 +1438,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel,
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = false;
        pathnode->pathkeys = NIL;       /* result is always unordered */
 
        cost_valuesscan(pathnode, root, rel, pathnode->param_info);
@@ -1443,6 +1460,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = false;
        pathnode->pathkeys = NIL;       /* XXX for now, result is always unordered */
 
        cost_ctescan(pathnode, root, rel, pathnode->param_info);
@@ -1465,6 +1483,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
        pathnode->parent = rel;
        pathnode->param_info = get_baserel_parampathinfo(root, rel,
                                                                                                         required_outer);
+       pathnode->parallel_aware = false;
        pathnode->pathkeys = NIL;       /* result is always unordered */
 
        /* Cost is the same as for a regular CTE scan */
@@ -1496,6 +1515,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
        pathnode->path.parent = rel;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
+       pathnode->path.parallel_aware = false;
        pathnode->path.rows = rows;
        pathnode->path.startup_cost = startup_cost;
        pathnode->path.total_cost = total_cost;
@@ -1630,6 +1650,7 @@ create_nestloop_path(PlannerInfo *root,
                                                                  sjinfo,
                                                                  required_outer,
                                                                  &restrict_clauses);
+       pathnode->path.parallel_aware = false;
        pathnode->path.pathkeys = pathkeys;
        pathnode->jointype = jointype;
        pathnode->outerjoinpath = outer_path;
@@ -1687,6 +1708,7 @@ create_mergejoin_path(PlannerInfo *root,
                                                                  sjinfo,
                                                                  required_outer,
                                                                  &restrict_clauses);
+       pathnode->jpath.path.parallel_aware = false;
        pathnode->jpath.path.pathkeys = pathkeys;
        pathnode->jpath.jointype = jointype;
        pathnode->jpath.outerjoinpath = outer_path;
@@ -1743,6 +1765,7 @@ create_hashjoin_path(PlannerInfo *root,
                                                                  sjinfo,
                                                                  required_outer,
                                                                  &restrict_clauses);
+       pathnode->jpath.path.parallel_aware = false;
 
        /*
         * A hashjoin never has pathkeys, since its output ordering is
@@ -1798,7 +1821,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
        switch (path->pathtype)
        {
                case T_SeqScan:
-                       return create_seqscan_path(root, rel, required_outer);
+                       return create_seqscan_path(root, rel, required_outer, 0);
                case T_SampleScan:
                        return (Path *) create_samplescan_path(root, rel, required_outer);
                case T_IndexScan:
index 39d12a62fcd17560fc538f27ca0bd16dc39f7aba..f8f9299b6524442654709562f3332ea995b7b502 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef NODESEQSCAN_H
 #define NODESEQSCAN_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);
@@ -21,4 +22,9 @@ extern TupleTableSlot *ExecSeqScan(SeqScanState *node);
 extern void ExecEndSeqScan(SeqScanState *node);
 extern void ExecReScanSeqScan(SeqScanState *node);
 
+/* parallel scan support */
+extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
+extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
+
 #endif   /* NODESEQSCAN_H */
index 58ec889b2f02fefd122633cc6aa587900bd4958d..eb3591a663f5d316242f9fa669c582bf76c1f339 100644 (file)
@@ -1248,11 +1248,15 @@ typedef struct ScanState
        TupleTableSlot *ss_ScanTupleSlot;
 } ScanState;
 
-/*
- * SeqScan uses a bare ScanState as its state node, since it needs
- * no additional fields.
+/* ----------------
+ *      SeqScanState information
+ * ----------------
  */
-typedef ScanState SeqScanState;
+typedef struct SeqScanState
+{
+       ScanState       ss;                             /* its first field is NodeTag */
+       Size            pscan_len;              /* size of parallel heap scan descriptor */
+} SeqScanState;
 
 /* ----------------
  *      SampleScanState information
index 6b28c8e28f4ddd846fc59f2af5966c4e0b41277e..292219db51f541de86b352077ccdb75185778ed1 100644 (file)
@@ -108,6 +108,11 @@ typedef struct Plan
        double          plan_rows;              /* number of rows plan is expected to emit */
        int                     plan_width;             /* average row width in bytes */
 
+       /*
+        * information needed for parallel query
+        */
+       bool            parallel_aware; /* engage parallel-aware logic? */
+
        /*
         * Common structural data for all Plan types.
         */
index 6cf2e24ce7d30e06cb759d32f64962723ef28dd5..d7406cc614988b46d674de321e9d0f3faad091b9 100644 (file)
@@ -753,6 +753,7 @@ typedef struct Path
 
        RelOptInfo *parent;                     /* the relation this path can build */
        ParamPathInfo *param_info;      /* parameterization info, or NULL if none */
+       bool            parallel_aware; /* engage parallel-aware logic? */
 
        /* estimated size/costs for path (see costsize.c for more info) */
        double          rows;                   /* estimated number of result tuples */
index 25a730362a845bbe74b6cb1a2f3ae271daf4f2e0..ac21a3a181016163745e4d85292c54d6f8ab7c66 100644 (file)
@@ -72,7 +72,7 @@ extern double clamp_row_est(double nrows);
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
                                        double index_pages, PlannerInfo *root);
 extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
-                        ParamPathInfo *param_info);
+                        ParamPathInfo *param_info, int nworkers);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
                                ParamPathInfo *param_info);
 extern void cost_index(IndexPath *path, PlannerInfo *root,
index 7a4940c7d20bf77a7a21f79bdda096c67d1e8340..f28b4e2b06330ac4e07360900e8849dd328f97c6 100644 (file)
@@ -31,7 +31,7 @@ extern bool add_path_precheck(RelOptInfo *parent_rel,
                                  List *pathkeys, Relids required_outer);
 
 extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
-                                       Relids required_outer);
+                                       Relids required_outer, int nworkers);
 extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel,
                                           Relids required_outer);
 extern IndexPath *create_index_path(PlannerInfo *root,