]> granicus.if.org Git - postgresql/commitdiff
Pass InitPlan values to workers via Gather (Merge).
authorRobert Haas <rhaas@postgresql.org>
Thu, 16 Nov 2017 17:06:14 +0000 (12:06 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 16 Nov 2017 17:06:14 +0000 (12:06 -0500)
If a PARAM_EXEC parameter is used below a Gather (Merge) but the InitPlan
that computes it is attached to or above the Gather (Merge), force the
value to be computed before starting parallelism and pass it down to all
workers.  This allows us to use parallelism in cases where it previously
would have had to be rejected as unsafe.  We do - in this case - lose the
optimization that the value is only computed if it's actually used.  An
alternative strategy would be to have the first worker that needs the value
compute it, but one downside of that approach is that we'd then need to
select a parallel-safe path to compute the parameter value; it couldn't for
example contain a Gather (Merge) node.  At some point in the future, we
might want to consider both approaches.

Independent of that consideration, there is a great deal more work that
could be done to make more kinds of PARAM_EXEC parameters parallel-safe.
This infrastructure could be used to allow a Gather (Merge) on the inner
side of a nested loop (although that's not a very appealing plan) and
cases where the InitPlan is attached below the Gather (Merge) could be
addressed as well using various techniques.  But this is a good start.

Amit Kapila, reviewed and revised by me.  Reviewing and testing from
Kuntal Ghosh, Haribabu Kommi, and Tushar Ahuja.

Discussion: http://postgr.es/m/CAA4eK1LV0Y1AUV4cUCdC+sYOx0Z0-8NAJ2Pd9=UKsbQ5Sr7+JQ@mail.gmail.com

17 files changed:
src/backend/commands/explain.c
src/backend/executor/execExprInterp.c
src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/plan/setrefs.c
src/backend/optimizer/util/clauses.c
src/include/executor/execExpr.h
src/include/executor/execParallel.h
src/include/nodes/plannodes.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 8f7062cd6eab5f20b54d02ea3d7397eb0b91d695..447f69d044e96f713f525131665fc5cdfb1b0a26 100644 (file)
@@ -107,6 +107,7 @@ static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 static void show_instrumentation_count(const char *qlabel, int which,
                                                   PlanState *planstate, ExplainState *es);
 static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es);
+static void show_eval_params(Bitmapset *bms_params, ExplainState *es);
 static const char *explain_get_index_name(Oid indexId);
 static void show_buffer_usage(ExplainState *es, const BufferUsage *usage);
 static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir,
@@ -1441,6 +1442,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
                                                                                           planstate, es);
                                ExplainPropertyInteger("Workers Planned",
                                                                           gather->num_workers, es);
+
+                               /* Show params evaluated at gather node */
+                               if (gather->initParam)
+                                       show_eval_params(gather->initParam, es);
+
                                if (es->analyze)
                                {
                                        int                     nworkers;
@@ -1463,6 +1469,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
                                                                                           planstate, es);
                                ExplainPropertyInteger("Workers Planned",
                                                                           gm->num_workers, es);
+
+                               /* Show params evaluated at gather-merge node */
+                               if (gm->initParam)
+                                       show_eval_params(gm->initParam, es);
+
                                if (es->analyze)
                                {
                                        int                     nworkers;
@@ -2487,6 +2498,29 @@ show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es)
        }
 }
 
+/*
+ * Show initplan params evaluated at Gather or Gather Merge node.
+ */
+static void
+show_eval_params(Bitmapset *bms_params, ExplainState *es)
+{
+       int                     paramid = -1;
+       List       *params = NIL;
+
+       Assert(bms_params);
+
+       while ((paramid = bms_next_member(bms_params, paramid)) >= 0)
+       {
+               char            param[32];
+
+               snprintf(param, sizeof(param), "$%d", paramid);
+               params = lappend(params, pstrdup(param));
+       }
+
+       if (params)
+               ExplainPropertyList("Params Evaluated", params, es);
+}
+
 /*
  * Fetch the name of an index in an EXPLAIN
  *
index a0f537b706bf3ce0a0baa2cbae08387958d9ec57..6c4612dad4a6555e888848b14b9ba1e31f26244d 100644 (file)
@@ -1926,6 +1926,33 @@ ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
        *op->resnull = prm->isnull;
 }
 
+/*
+ * ExecEvalParamExecParams
+ *
+ * Execute the subplan stored in PARAM_EXEC initplans params, if not executed
+ * till now.
+ */
+void
+ExecEvalParamExecParams(Bitmapset *params, EState *estate)
+{
+       ParamExecData *prm;
+       int                     paramid;
+
+       paramid = -1;
+       while ((paramid = bms_next_member(params, paramid)) >= 0)
+       {
+               prm = &(estate->es_param_exec_vals[paramid]);
+
+               if (prm->execPlan != NULL)
+               {
+                       /* Parameter not evaluated yet, so go do it */
+                       ExecSetParamPlan(prm->execPlan, GetPerTupleExprContext(estate));
+                       /* ExecSetParamPlan should have processed this param... */
+                       Assert(prm->execPlan == NULL);
+               }
+       }
+}
+
 /*
  * Evaluate a PARAM_EXTERN parameter.
  *
index fd7e7cbf3d3516412aeea3906eeaa79176ea87a6..c4355506378e9f9840ef6f56d8c4041615b53126 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "postgres.h"
 
+#include "executor/execExpr.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
 #include "executor/nodeBitmapHeapscan.h"
@@ -38,7 +39,9 @@
 #include "optimizer/planner.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
+#include "utils/datum.h"
 #include "utils/dsa.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "pgstat.h"
@@ -50,7 +53,7 @@
  */
 #define PARALLEL_KEY_EXECUTOR_FIXED            UINT64CONST(0xE000000000000001)
 #define PARALLEL_KEY_PLANNEDSTMT               UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_PARAMS                            UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_PARAMLISTINFO             UINT64CONST(0xE000000000000003)
 #define PARALLEL_KEY_BUFFER_USAGE              UINT64CONST(0xE000000000000004)
 #define PARALLEL_KEY_TUPLE_QUEUE               UINT64CONST(0xE000000000000005)
 #define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000006)
@@ -65,6 +68,7 @@
 typedef struct FixedParallelExecutorState
 {
        int64           tuples_needed;  /* tuple bound, see ExecSetTupleBound */
+       dsa_pointer param_exec;
 } FixedParallelExecutorState;
 
 /*
@@ -266,6 +270,133 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
        return planstate_tree_walker(planstate, ExecParallelEstimate, e);
 }
 
+/*
+ * Estimate the amount of space required to serialize the indicated parameters.
+ */
+static Size
+EstimateParamExecSpace(EState *estate, Bitmapset *params)
+{
+       int                     paramid;
+       Size            sz = sizeof(int);
+
+       paramid = -1;
+       while ((paramid = bms_next_member(params, paramid)) >= 0)
+       {
+               Oid                     typeOid;
+               int16           typLen;
+               bool            typByVal;
+               ParamExecData *prm;
+
+               prm = &(estate->es_param_exec_vals[paramid]);
+               typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
+                                                          paramid);
+
+               sz = add_size(sz, sizeof(int)); /* space for paramid */
+
+               /* space for datum/isnull */
+               if (OidIsValid(typeOid))
+                       get_typlenbyval(typeOid, &typLen, &typByVal);
+               else
+               {
+                       /* If no type OID, assume by-value, like copyParamList does. */
+                       typLen = sizeof(Datum);
+                       typByVal = true;
+               }
+               sz = add_size(sz,
+                                         datumEstimateSpace(prm->value, prm->isnull,
+                                                                                typByVal, typLen));
+       }
+       return sz;
+}
+
+/*
+ * Serialize specified PARAM_EXEC parameters.
+ *
+ * We write the number of parameters first, as a 4-byte integer, and then
+ * write details for each parameter in turn.  The details for each parameter
+ * consist of a 4-byte paramid (location of param in execution time internal
+ * parameter array) and then the datum as serialized by datumSerialize().
+ */
+static dsa_pointer
+SerializeParamExecParams(EState *estate, Bitmapset *params)
+{
+       Size            size;
+       int                     nparams;
+       int                     paramid;
+       ParamExecData *prm;
+       dsa_pointer handle;
+       char       *start_address;
+
+       /* Allocate enough space for the current parameter values. */
+       size = EstimateParamExecSpace(estate, params);
+       handle = dsa_allocate(estate->es_query_dsa, size);
+       start_address = dsa_get_address(estate->es_query_dsa, handle);
+
+       /* First write the number of parameters as a 4-byte integer. */
+       nparams = bms_num_members(params);
+       memcpy(start_address, &nparams, sizeof(int));
+       start_address += sizeof(int);
+
+       /* Write details for each parameter in turn. */
+       paramid = -1;
+       while ((paramid = bms_next_member(params, paramid)) >= 0)
+       {
+               Oid                     typeOid;
+               int16           typLen;
+               bool            typByVal;
+
+               prm = &(estate->es_param_exec_vals[paramid]);
+               typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
+                                                          paramid);
+
+               /* Write paramid. */
+               memcpy(start_address, &paramid, sizeof(int));
+               start_address += sizeof(int);
+
+               /* Write datum/isnull */
+               if (OidIsValid(typeOid))
+                       get_typlenbyval(typeOid, &typLen, &typByVal);
+               else
+               {
+                       /* If no type OID, assume by-value, like copyParamList does. */
+                       typLen = sizeof(Datum);
+                       typByVal = true;
+               }
+               datumSerialize(prm->value, prm->isnull, typByVal, typLen,
+                                          &start_address);
+       }
+
+       return handle;
+}
+
+/*
+ * Restore specified PARAM_EXEC parameters.
+ */
+static void
+RestoreParamExecParams(char *start_address, EState *estate)
+{
+       int                     nparams;
+       int                     i;
+       int                     paramid;
+
+       memcpy(&nparams, start_address, sizeof(int));
+       start_address += sizeof(int);
+
+       for (i = 0; i < nparams; i++)
+       {
+               ParamExecData *prm;
+
+               /* Read paramid */
+               memcpy(&paramid, start_address, sizeof(int));
+               start_address += sizeof(int);
+               prm = &(estate->es_param_exec_vals[paramid]);
+
+               /* Read datum/isnull. */
+               prm->value = datumRestore(&start_address, &prm->isnull);
+               prm->execPlan = NULL;
+       }
+}
+
 /*
  * Initialize the dynamic shared memory segment that will be used to control
  * parallel execution.
@@ -395,7 +526,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
  * execution and return results to the main backend.
  */
 ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+ExecInitParallelPlan(PlanState *planstate, EState *estate,
+                                        Bitmapset *sendParams, int nworkers,
                                         int64 tuples_needed)
 {
        ParallelExecutorInfo *pei;
@@ -405,17 +537,20 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
        FixedParallelExecutorState *fpes;
        char       *pstmt_data;
        char       *pstmt_space;
-       char       *param_space;
+       char       *paramlistinfo_space;
        BufferUsage *bufusage_space;
        SharedExecutorInstrumentation *instrumentation = NULL;
        int                     pstmt_len;
-       int                     param_len;
+       int                     paramlistinfo_len;
        int                     instrumentation_len = 0;
        int                     instrument_offset = 0;
        Size            dsa_minsize = dsa_minimum_size();
        char       *query_string;
        int                     query_len;
 
+       /* Force parameters we're going to pass to workers to be evaluated. */
+       ExecEvalParamExecParams(sendParams, estate);
+
        /* Allocate object for return value. */
        pei = palloc0(sizeof(ParallelExecutorInfo));
        pei->finished = false;
@@ -450,8 +585,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
        shm_toc_estimate_keys(&pcxt->estimator, 1);
 
        /* Estimate space for serialized ParamListInfo. */
-       param_len = EstimateParamListSpace(estate->es_param_list_info);
-       shm_toc_estimate_chunk(&pcxt->estimator, param_len);
+       paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
+       shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
        shm_toc_estimate_keys(&pcxt->estimator, 1);
 
        /*
@@ -511,6 +646,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
        /* Store fixed-size state. */
        fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
        fpes->tuples_needed = tuples_needed;
+       fpes->param_exec = InvalidDsaPointer;
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
 
        /* Store query string */
@@ -524,9 +660,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
 
        /* Store serialized ParamListInfo. */
-       param_space = shm_toc_allocate(pcxt->toc, param_len);
-       shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
-       SerializeParamList(estate->es_param_list_info, &param_space);
+       paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
+       shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
+       SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
 
        /* Allocate space for each worker's BufferUsage; no need to initialize. */
        bufusage_space = shm_toc_allocate(pcxt->toc,
@@ -577,13 +713,25 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
                pei->area = dsa_create_in_place(area_space, dsa_minsize,
                                                                                LWTRANCHE_PARALLEL_QUERY_DSA,
                                                                                pcxt->seg);
-       }
 
-       /*
-        * Make the area available to executor nodes running in the leader.  See
-        * also ParallelQueryMain which makes it available to workers.
-        */
-       estate->es_query_dsa = pei->area;
+               /*
+                * Make the area available to executor nodes running in the leader.
+                * See also ParallelQueryMain which makes it available to workers.
+                */
+               estate->es_query_dsa = pei->area;
+
+               /*
+                * Serialize parameters, if any, using DSA storage.  We don't dare use
+                * the main parallel query DSM for this because we might relaunch
+                * workers after the values have changed (and thus the amount of
+                * storage required has changed).
+                */
+               if (!bms_is_empty(sendParams))
+               {
+                       pei->param_exec = SerializeParamExecParams(estate, sendParams);
+                       fpes->param_exec = pei->param_exec;
+               }
+       }
 
        /*
         * Give parallel-aware nodes a chance to initialize their shared data.
@@ -640,16 +788,39 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei)
  */
 void
 ExecParallelReinitialize(PlanState *planstate,
-                                                ParallelExecutorInfo *pei)
+                                                ParallelExecutorInfo *pei,
+                                                Bitmapset *sendParams)
 {
+       EState     *estate = planstate->state;
+       FixedParallelExecutorState *fpes;
+
        /* Old workers must already be shut down */
        Assert(pei->finished);
 
+       /* Force parameters we're going to pass to workers to be evaluated. */
+       ExecEvalParamExecParams(sendParams, estate);
+
        ReinitializeParallelDSM(pei->pcxt);
        pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
        pei->reader = NULL;
        pei->finished = false;
 
+       fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
+       /* Free any serialized parameters from the last round. */
+       if (DsaPointerIsValid(fpes->param_exec))
+       {
+               dsa_free(estate->es_query_dsa, fpes->param_exec);
+               fpes->param_exec = InvalidDsaPointer;
+       }
+
+       /* Serialize current parameter values if required. */
+       if (!bms_is_empty(sendParams))
+       {
+               pei->param_exec = SerializeParamExecParams(estate, sendParams);
+               fpes->param_exec = pei->param_exec;
+       }
+
        /* Traverse plan tree and let each child node reset associated state. */
        ExecParallelReInitializeDSM(planstate, pei->pcxt);
 }
@@ -831,6 +1002,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 void
 ExecParallelCleanup(ParallelExecutorInfo *pei)
 {
+       /* Free any serialized parameters. */
+       if (DsaPointerIsValid(pei->param_exec))
+       {
+               dsa_free(pei->area, pei->param_exec);
+               pei->param_exec = InvalidDsaPointer;
+       }
        if (pei->area != NULL)
        {
                dsa_detach(pei->area);
@@ -882,7 +1059,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
        pstmt = (PlannedStmt *) stringToNode(pstmtspace);
 
        /* Reconstruct ParamListInfo. */
-       paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false);
+       paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
        paramLI = RestoreParamList(&paramspace);
 
        /*
@@ -1046,6 +1223,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 
        /* Special executor initialization steps for parallel workers */
        queryDesc->planstate->state->es_query_dsa = area;
+       if (DsaPointerIsValid(fpes->param_exec))
+       {
+               char       *paramexec_space;
+
+               paramexec_space = dsa_get_address(area, fpes->param_exec);
+               RestoreParamExecParams(paramexec_space, queryDesc->estate);
+
+       }
        ExecParallelInitializeWorker(queryDesc->planstate, toc);
 
        /* Pass down any tuple bound */
index 0298c65d06502e1254c52618e0d758f63758c535..07c62d2feab9bc617caa65aac7102b91d4f9e08a 100644 (file)
@@ -160,11 +160,13 @@ ExecGather(PlanState *pstate)
                        if (!node->pei)
                                node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                                                                 estate,
+                                                                                                gather->initParam,
                                                                                                 gather->num_workers,
                                                                                                 node->tuples_needed);
                        else
                                ExecParallelReinitialize(node->ps.lefttree,
-                                                                                node->pei);
+                                                                                node->pei,
+                                                                                gather->initParam);
 
                        /*
                         * Register backend workers. We might not get as many as we
index 7206ab919758c7fc05133ad60f183a052c998c3f..7dd655c4489358989e4a1d608368f9bfe2bc1b22 100644 (file)
@@ -203,11 +203,13 @@ ExecGatherMerge(PlanState *pstate)
                        if (!node->pei)
                                node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                                                                 estate,
+                                                                                                gm->initParam,
                                                                                                 gm->num_workers,
                                                                                                 node->tuples_needed);
                        else
                                ExecParallelReinitialize(node->ps.lefttree,
-                                                                                node->pei);
+                                                                                node->pei,
+                                                                                gm->initParam);
 
                        /* Try to launch workers. */
                        pcxt = node->pei->pcxt;
index 76e75459b46a781aa423e53baae62391cce29edd..d9ff8a7e510a2f0027f19e8d4d1ba6c57408aa53 100644 (file)
@@ -364,6 +364,7 @@ _copyGather(const Gather *from)
        COPY_SCALAR_FIELD(rescan_param);
        COPY_SCALAR_FIELD(single_copy);
        COPY_SCALAR_FIELD(invisible);
+       COPY_BITMAPSET_FIELD(initParam);
 
        return newnode;
 }
@@ -391,6 +392,7 @@ _copyGatherMerge(const GatherMerge *from)
        COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
        COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
        COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+       COPY_BITMAPSET_FIELD(initParam);
 
        return newnode;
 }
index dc35df9e4fe11a4d26cf92e48a57f9d654424a4e..c97ee24ade15904374bb5db4c152cb4a5a2e2674 100644 (file)
@@ -487,6 +487,7 @@ _outGather(StringInfo str, const Gather *node)
        WRITE_INT_FIELD(rescan_param);
        WRITE_BOOL_FIELD(single_copy);
        WRITE_BOOL_FIELD(invisible);
+       WRITE_BITMAPSET_FIELD(initParam);
 }
 
 static void
@@ -517,6 +518,8 @@ _outGatherMerge(StringInfo str, const GatherMerge *node)
        appendStringInfoString(str, " :nullsFirst");
        for (i = 0; i < node->numCols; i++)
                appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
+
+       WRITE_BITMAPSET_FIELD(initParam);
 }
 
 static void
index 593658dd8a8910841bfbe5febf83569cd49ea3e4..7eb67fc040787e3709b384131ef68f2cf822b28d 100644 (file)
@@ -2172,6 +2172,7 @@ _readGather(void)
        READ_INT_FIELD(rescan_param);
        READ_BOOL_FIELD(single_copy);
        READ_BOOL_FIELD(invisible);
+       READ_BITMAPSET_FIELD(initParam);
 
        READ_DONE();
 }
@@ -2193,6 +2194,7 @@ _readGatherMerge(void)
        READ_OID_ARRAY(sortOperators, local_node->numCols);
        READ_OID_ARRAY(collations, local_node->numCols);
        READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+       READ_BITMAPSET_FIELD(initParam);
 
        READ_DONE();
 }
index 9c74e39bd3485e4cac9ab22f7de93080d69f87d4..d4454779ee94d71899cfd1579713bc9383c1743e 100644 (file)
@@ -6279,6 +6279,7 @@ make_gather(List *qptlist,
        node->rescan_param = rescan_param;
        node->single_copy = single_copy;
        node->invisible = false;
+       node->initParam = NULL;
 
        return node;
 }
index 4c00a1453bd07542685485997914bd1de7f6e335..f6b8bbf5fa67c7bfd9ce4371e458cc93dc7a1512 100644 (file)
@@ -377,6 +377,14 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
        {
                Gather     *gather = makeNode(Gather);
 
+               /*
+                * If there are any initPlans attached to the formerly-top plan node,
+                * move them up to the Gather node; same as we do for Material node in
+                * materialize_finished_plan.
+                */
+               gather->plan.initPlan = top_plan->initPlan;
+               top_plan->initPlan = NIL;
+
                gather->plan.targetlist = top_plan->targetlist;
                gather->plan.qual = NIL;
                gather->plan.lefttree = top_plan;
index fa9a3f0b47ba0dbca434f6d429e8939974d0a68e..28a7f7ec456274ce38b244a3dd38dcd15f7de967 100644 (file)
@@ -107,6 +107,7 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context);
 static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context);
 static void set_join_references(PlannerInfo *root, Join *join, int rtoffset);
 static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset);
+static void set_param_references(PlannerInfo *root, Plan *plan);
 static Node *convert_combining_aggrefs(Node *node, void *context);
 static void set_dummy_tlist_references(Plan *plan, int rtoffset);
 static indexed_tlist *build_tlist_index(List *tlist);
@@ -632,7 +633,10 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 
                case T_Gather:
                case T_GatherMerge:
-                       set_upper_references(root, plan, rtoffset);
+                       {
+                               set_upper_references(root, plan, rtoffset);
+                               set_param_references(root, plan);
+                       }
                        break;
 
                case T_Hash:
@@ -1781,6 +1785,51 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset)
        pfree(subplan_itlist);
 }
 
+/*
+ * set_param_references
+ *       Initialize the initParam list in Gather or Gather merge node such that
+ *       it contains reference of all the params that needs to be evaluated
+ *       before execution of the node.  It contains the initplan params that are
+ *       being passed to the plan nodes below it.
+ */
+static void
+set_param_references(PlannerInfo *root, Plan *plan)
+{
+       Assert(IsA(plan, Gather) || IsA(plan, GatherMerge));
+
+       if (plan->lefttree->extParam)
+       {
+               PlannerInfo *proot;
+               Bitmapset  *initSetParam = NULL;
+               ListCell   *l;
+
+               for (proot = root; proot != NULL; proot = proot->parent_root)
+               {
+                       foreach(l, proot->init_plans)
+                       {
+                               SubPlan    *initsubplan = (SubPlan *) lfirst(l);
+                               ListCell   *l2;
+
+                               foreach(l2, initsubplan->setParam)
+                               {
+                                       initSetParam = bms_add_member(initSetParam, lfirst_int(l2));
+                               }
+                       }
+               }
+
+               /*
+                * Remember the list of all external initplan params that are used by
+                * the children of Gather or Gather merge node.
+                */
+               if (IsA(plan, Gather))
+                       ((Gather *) plan)->initParam =
+                               bms_intersect(plan->lefttree->extParam, initSetParam);
+               else
+                       ((GatherMerge *) plan)->initParam =
+                               bms_intersect(plan->lefttree->extParam, initSetParam);
+       }
+}
+
 /*
  * Recursively scan an expression tree and convert Aggrefs to the proper
  * intermediate form for combining aggregates.  This means (1) replacing each
index 66e098f488a2007b950bbc98d06b956d21ea0369..d14ef31eae21a7fc2c2f91c2163b2b0ee29a3208 100644 (file)
@@ -1087,6 +1087,8 @@ bool
 is_parallel_safe(PlannerInfo *root, Node *node)
 {
        max_parallel_hazard_context context;
+       PlannerInfo *proot;
+       ListCell   *l;
 
        /*
         * Even if the original querytree contained nothing unsafe, we need to
@@ -1101,6 +1103,25 @@ is_parallel_safe(PlannerInfo *root, Node *node)
        context.max_hazard = PROPARALLEL_SAFE;
        context.max_interesting = PROPARALLEL_RESTRICTED;
        context.safe_param_ids = NIL;
+
+       /*
+        * The params that refer to the same or parent query level are considered
+        * parallel-safe.  The idea is that we compute such params at Gather or
+        * Gather Merge node and pass their value to workers.
+        */
+       for (proot = root; proot != NULL; proot = proot->parent_root)
+       {
+               foreach(l, proot->init_plans)
+               {
+                       SubPlan    *initsubplan = (SubPlan *) lfirst(l);
+                       ListCell   *l2;
+
+                       foreach(l2, initsubplan->setParam)
+                               context.safe_param_ids = lcons_int(lfirst_int(l2),
+                                                                                       context.safe_param_ids);
+               }
+       }
+
        return !max_parallel_hazard_walker(node, &context);
 }
 
@@ -1225,7 +1246,8 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
         * We can't pass Params to workers at the moment either, so they are also
         * parallel-restricted, unless they are PARAM_EXTERN Params or are
         * PARAM_EXEC Params listed in safe_param_ids, meaning they could be
-        * generated within the worker.
+        * either generated within the worker or can be computed in master and
+        * then their value can be passed to the worker.
         */
        else if (IsA(node, Param))
        {
index 78d22478166fac22fc474931ec7c76ac3887d498..5bbb63a9d80233eebaa3a1d8da787873cd938bec 100644 (file)
@@ -609,6 +609,7 @@ extern ExprEvalOp ExecEvalStepOp(ExprState *state, ExprEvalStep *op);
  */
 extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op,
                                  ExprContext *econtext);
+extern void ExecEvalParamExecParams(Bitmapset *params, EState *estate);
 extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op,
                                        ExprContext *econtext);
 extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op);
index e1b3e7af1f74283791f7b1c60a22d173f98b8b05..99a13f3b7d1f251c6b5f44196d89d6d5adbacade 100644 (file)
@@ -28,6 +28,7 @@ typedef struct ParallelExecutorInfo
        BufferUsage *buffer_usage;      /* points to bufusage area in DSM */
        SharedExecutorInstrumentation *instrumentation; /* optional */
        dsa_area   *area;                       /* points to DSA area in DSM */
+       dsa_pointer     param_exec;             /* serialized PARAM_EXEC parameters */
        bool            finished;               /* set true by ExecParallelFinish */
        /* These two arrays have pcxt->nworkers_launched entries: */
        shm_mq_handle **tqueue;         /* tuple queues for worker output */
@@ -35,12 +36,13 @@ typedef struct ParallelExecutorInfo
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
-                                        EState *estate, int nworkers, int64 tuples_needed);
+                                        EState *estate, Bitmapset *sendParam, int nworkers,
+                                        int64 tuples_needed);
 extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(PlanState *planstate,
-                                                ParallelExecutorInfo *pei);
+                                                ParallelExecutorInfo *pei, Bitmapset *sendParam);
 
 extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
 
index a127682b0e7827277f09e571929f7c3423ea0628..9b38d44ba0293b54e5e4e7fa34e1a7333ece66ed 100644 (file)
@@ -841,6 +841,8 @@ typedef struct Gather
        int                     rescan_param;   /* ID of Param that signals a rescan, or -1 */
        bool            single_copy;    /* don't execute plan more than once */
        bool            invisible;              /* suppress EXPLAIN display (for testing)? */
+       Bitmapset  *initParam;          /* param id's of initplans which are referred
+                                                                * at gather or one of it's child node */
 } Gather;
 
 /* ------------
@@ -858,6 +860,8 @@ typedef struct GatherMerge
        Oid                *sortOperators;      /* OIDs of operators to sort them by */
        Oid                *collations;         /* OIDs of collations */
        bool       *nullsFirst;         /* NULLS FIRST/LAST directions */
+       Bitmapset  *initParam;          /* param id's of initplans which are referred
+                                                                * at gather merge or one of it's child node */
 } GatherMerge;
 
 /* ----------------
index 06aeddd80552d0ff66a351294766458cbe013b5d..d1d5b228ce0996b93b9716738c44358ad3e1cd59 100644 (file)
@@ -201,6 +201,41 @@ explain (costs off)
      ->  Seq Scan on tenk2
 (4 rows)
 
+alter table tenk2 reset (parallel_workers);
+-- test parallel plan for a query containing initplan.
+set enable_indexscan = off;
+set enable_indexonlyscan = off;
+set enable_bitmapscan = off;
+alter table tenk2 set (parallel_workers = 2);
+explain (costs off)
+       select count(*) from tenk1
+        where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Aggregate
+   InitPlan 1 (returns $2)
+     ->  Finalize Aggregate
+           ->  Gather
+                 Workers Planned: 2
+                 ->  Partial Aggregate
+                       ->  Parallel Seq Scan on tenk2
+   ->  Gather
+         Workers Planned: 4
+         Params Evaluated: $2
+         ->  Parallel Seq Scan on tenk1
+               Filter: (unique1 = $2)
+(12 rows)
+
+select count(*) from tenk1
+    where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2);
+ count 
+-------
+     1
+(1 row)
+
+reset enable_indexscan;
+reset enable_indexonlyscan;
+reset enable_bitmapscan;
 alter table tenk2 reset (parallel_workers);
 -- test parallel index scans.
 set enable_seqscan to off;
index b701b35408e45b6b856c2af781380bcfd4526f63..bb4e34adbe0f1bfbafbfca0834bdae1e3e7dd222 100644 (file)
@@ -74,6 +74,23 @@ explain (costs off)
        (select ten from tenk2);
 alter table tenk2 reset (parallel_workers);
 
+-- test parallel plan for a query containing initplan.
+set enable_indexscan = off;
+set enable_indexonlyscan = off;
+set enable_bitmapscan = off;
+alter table tenk2 set (parallel_workers = 2);
+
+explain (costs off)
+       select count(*) from tenk1
+        where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2);
+select count(*) from tenk1
+    where tenk1.unique1 = (Select max(tenk2.unique1) from tenk2);
+
+reset enable_indexscan;
+reset enable_indexonlyscan;
+reset enable_bitmapscan;
+alter table tenk2 reset (parallel_workers);
+
 -- test parallel index scans.
 set enable_seqscan to off;
 set enable_bitmapscan to off;