]> granicus.if.org Git - postgresql/blobdiff - src/backend/executor/execParallel.c
Change representation of statement lists, and add statement location info.
[postgresql] / src / backend / executor / execParallel.c
index 99a9de3cdc397fde4dab1623cdb968508ff87445..e01fe6da96492f46d034c9522f88062646437d1c 100644 (file)
@@ -3,7 +3,7 @@
  * execParallel.c
  *       Support routines for parallel execution.
  *
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * This file contains routines that are intended to support setting up,
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeCustom.h"
+#include "executor/nodeForeignscan.h"
+#include "executor/nodeSeqscan.h"
 #include "executor/tqueue.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/planmain.h"
 #include "optimizer/planner.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
+#include "utils/dsa.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 
 #define PARALLEL_KEY_BUFFER_USAGE              UINT64CONST(0xE000000000000003)
 #define PARALLEL_KEY_TUPLE_QUEUE               UINT64CONST(0xE000000000000004)
 #define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_DSA                               UINT64CONST(0xE000000000000006)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE              65536
 
-/* DSM structure for accumulating per-PlanState instrumentation. */
-typedef struct SharedPlanStateInstrumentation
-{
-       int plan_node_id;
-       slock_t mutex;
-       Instrumentation instr;
-} SharedPlanStateInstrumentation;
-
-/* DSM structure for accumulating per-PlanState instrumentation. */
+/*
+ * DSM structure for accumulating per-PlanState instrumentation.
+ *
+ * instrument_options: Same meaning here as in instrument.c.
+ *
+ * instrument_offset: Offset, relative to the start of this structure,
+ * of the first Instrumentation object.  This will depend on the length of
+ * the plan_node_id array.
+ *
+ * num_workers: Number of workers.
+ *
+ * num_plan_nodes: Number of plan nodes.
+ *
+ * plan_node_id: Array of plan nodes for which we are gathering instrumentation
+ * from parallel workers.  The length of this array is given by num_plan_nodes.
+ */
 struct SharedExecutorInstrumentation
 {
-       int instrument_options;
-       int ps_ninstrument;                     /* # of ps_instrument structures following */
-       SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+       int                     instrument_options;
+       int                     instrument_offset;
+       int                     num_workers;
+       int                     num_plan_nodes;
+       int                     plan_node_id[FLEXIBLE_ARRAY_MEMBER];
+       /* array of num_plan_nodes * num_workers Instrumentation objects follows */
 };
+#define GetInstrumentationArray(sei) \
+       (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
+        (Instrumentation *) (((char *) sei) + sei->instrument_offset))
 
 /* Context object for ExecParallelEstimate. */
 typedef struct ExecParallelEstimateContext
 {
        ParallelContext *pcxt;
-       int nnodes;
+       int                     nnodes;
 } ExecParallelEstimateContext;
 
-/* Context object for ExecParallelEstimate. */
+/* Context object for ExecParallelInitializeDSM. */
 typedef struct ExecParallelInitializeDSMContext
 {
        ParallelContext *pcxt;
        SharedExecutorInstrumentation *instrumentation;
-       int nnodes;
+       int                     nnodes;
 } ExecParallelInitializeDSMContext;
 
 /* Helper functions that run in the parallel leader. */
@@ -83,11 +101,11 @@ static char *ExecSerializePlan(Plan *plan, EState *estate);
 static bool ExecParallelEstimate(PlanState *node,
                                         ExecParallelEstimateContext *e);
 static bool ExecParallelInitializeDSM(PlanState *node,
-                                        ExecParallelInitializeDSMContext *d);
+                                                 ExecParallelInitializeDSMContext *d);
 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
                                                         bool reinitialize);
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
-                                                 SharedExecutorInstrumentation *instrumentation);
+                                                        SharedExecutorInstrumentation *instrumentation);
 
 /* Helper functions that run in the parallel worker. */
 static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
@@ -129,21 +147,24 @@ ExecSerializePlan(Plan *plan, EState *estate)
        pstmt = makeNode(PlannedStmt);
        pstmt->commandType = CMD_SELECT;
        pstmt->queryId = 0;
-       pstmt->hasReturning = 0;
-       pstmt->hasModifyingCTE = 0;
-       pstmt->canSetTag = 1;
-       pstmt->transientPlan = 0;
+       pstmt->hasReturning = false;
+       pstmt->hasModifyingCTE = false;
+       pstmt->canSetTag = true;
+       pstmt->transientPlan = false;
+       pstmt->dependsOnRole = false;
+       pstmt->parallelModeNeeded = false;
        pstmt->planTree = plan;
        pstmt->rtable = estate->es_range_table;
        pstmt->resultRelations = NIL;
-       pstmt->utilityStmt = NULL;
        pstmt->subplans = NIL;
        pstmt->rewindPlanIDs = NULL;
        pstmt->rowMarks = NIL;
-       pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
        pstmt->relationOids = NIL;
        pstmt->invalItems = NIL;        /* workers can't replan anyway... */
-       pstmt->hasRowSecurity = false;
+       pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
+       pstmt->utilityStmt = NULL;
+       pstmt->stmt_location = -1;
+       pstmt->stmt_len = -1;
 
        /* Return serialized copy of our dummy PlannedStmt. */
        return nodeToString(pstmt);
@@ -167,20 +188,34 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
        /* Count this node. */
        e->nnodes++;
 
-       /*
-        * XXX. Call estimators for parallel-aware nodes here, when we have
-        * some.
-        */
+       /* Call estimators for parallel-aware nodes. */
+       if (planstate->plan->parallel_aware)
+       {
+               switch (nodeTag(planstate))
+               {
+                       case T_SeqScanState:
+                               ExecSeqScanEstimate((SeqScanState *) planstate,
+                                                                       e->pcxt);
+                               break;
+                       case T_ForeignScanState:
+                               ExecForeignScanEstimate((ForeignScanState *) planstate,
+                                                                               e->pcxt);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanEstimate((CustomScanState *) planstate,
+                                                                          e->pcxt);
+                               break;
+                       default:
+                               break;
+               }
+       }
 
        return planstate_tree_walker(planstate, ExecParallelEstimate, e);
 }
 
 /*
- * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
- * may need to initialize shared state in the DSM before parallel workers
- * are available.  They can allocate the space they previous estimated using
- * shm_toc_allocate, and add the keys they previously estimated using
- * shm_toc_insert, in each case targeting pcxt->toc.
+ * Initialize the dynamic shared memory segment that will be used to control
+ * parallel execution.
  */
 static bool
 ExecParallelInitializeDSM(PlanState *planstate,
@@ -189,26 +224,43 @@ ExecParallelInitializeDSM(PlanState *planstate,
        if (planstate == NULL)
                return false;
 
-       /* If instrumentation is enabled, initialize array slot for this node. */
+       /* If instrumentation is enabled, initialize slot for this node. */
        if (d->instrumentation != NULL)
-       {
-               SharedPlanStateInstrumentation *instrumentation;
-
-               instrumentation = &d->instrumentation->ps_instrument[d->nnodes];
-               Assert(d->nnodes < d->instrumentation->ps_ninstrument);
-               instrumentation->plan_node_id = planstate->plan->plan_node_id;
-               SpinLockInit(&instrumentation->mutex);
-               InstrInit(&instrumentation->instr,
-                                 d->instrumentation->instrument_options);
-       }
+               d->instrumentation->plan_node_id[d->nnodes] =
+                       planstate->plan->plan_node_id;
 
        /* Count this node. */
        d->nnodes++;
 
        /*
-        * XXX. Call initializers for parallel-aware plan nodes, when we have
-        * some.
+        * Call initializers for parallel-aware plan nodes.
+        *
+        * Ordinary plan nodes won't do anything here, but parallel-aware plan
+        * nodes may need to initialize shared state in the DSM before parallel
+        * workers are available.  They can allocate the space they previously
+        * estimated using shm_toc_allocate, and add the keys they previously
+        * estimated using shm_toc_insert, in each case targeting pcxt->toc.
         */
+       if (planstate->plan->parallel_aware)
+       {
+               switch (nodeTag(planstate))
+               {
+                       case T_SeqScanState:
+                               ExecSeqScanInitializeDSM((SeqScanState *) planstate,
+                                                                                d->pcxt);
+                               break;
+                       case T_ForeignScanState:
+                               ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
+                                                                                        d->pcxt);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanInitializeDSM((CustomScanState *) planstate,
+                                                                                       d->pcxt);
+                               break;
+                       default:
+                               break;
+               }
+       }
 
        return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
 }
@@ -239,7 +291,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
        if (!reinitialize)
                tqueuespace =
                        shm_toc_allocate(pcxt->toc,
-                                                        PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+                                                        mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
+                                                                         pcxt->nworkers));
        else
                tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
 
@@ -248,7 +301,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
        {
                shm_mq     *mq;
 
-               mq = shm_mq_create(tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE,
+               mq = shm_mq_create(tqueuespace +
+                                                  ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
                                                   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
 
                shm_mq_set_receiver(mq, MyProc);
@@ -264,13 +318,15 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 }
 
 /*
- * Re-initialize the response queues for backend workers to return tuples
- * to the main backend and start the workers.
+ * Re-initialize the parallel executor info such that it can be reused by
+ * workers.
  */
-shm_mq_handle **
-ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+void
+ExecParallelReinitialize(ParallelExecutorInfo *pei)
 {
-       return ExecParallelSetupTupleQueues(pcxt, true);
+       ReinitializeParallelDSM(pei->pcxt);
+       pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+       pei->finished = false;
 }
 
 /*
@@ -292,9 +348,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        int                     pstmt_len;
        int                     param_len;
        int                     instrumentation_len = 0;
+       int                     instrument_offset = 0;
+       Size            dsa_minsize = dsa_minimum_size();
 
        /* Allocate object for return value. */
        pei = palloc0(sizeof(ParallelExecutorInfo));
+       pei->finished = false;
        pei->planstate = planstate;
 
        /* Fix up and serialize plan to be sent to workers. */
@@ -328,17 +387,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
         * looking at pgBufferUsage, so do it unconditionally.
         */
        shm_toc_estimate_chunk(&pcxt->estimator,
-                                                  sizeof(BufferUsage) * pcxt->nworkers);
+                                                  mul_size(sizeof(BufferUsage), pcxt->nworkers));
        shm_toc_estimate_keys(&pcxt->estimator, 1);
 
        /* Estimate space for tuple queues. */
        shm_toc_estimate_chunk(&pcxt->estimator,
-                                                  PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+                                               mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
        shm_toc_estimate_keys(&pcxt->estimator, 1);
 
        /*
-        * Give parallel-aware nodes a chance to add to the estimates, and get
-        * count of how many PlanState nodes there are.
+        * Give parallel-aware nodes a chance to add to the estimates, and get a
+        * count of how many PlanState nodes there are.
         */
        e.pcxt = pcxt;
        e.nnodes = 0;
@@ -348,12 +407,21 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        if (estate->es_instrument)
        {
                instrumentation_len =
-                       offsetof(SharedExecutorInstrumentation, ps_instrument)
-                       + sizeof(SharedPlanStateInstrumentation) * e.nnodes;
+                       offsetof(SharedExecutorInstrumentation, plan_node_id) +
+                       sizeof(int) * e.nnodes;
+               instrumentation_len = MAXALIGN(instrumentation_len);
+               instrument_offset = instrumentation_len;
+               instrumentation_len +=
+                       mul_size(sizeof(Instrumentation),
+                                        mul_size(e.nnodes, nworkers));
                shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
                shm_toc_estimate_keys(&pcxt->estimator, 1);
        }
 
+       /* Estimate space for DSA area. */
+       shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+
        /* Everyone's had a chance to ask for space, so now create the DSM. */
        InitializeParallelDSM(pcxt);
 
@@ -377,7 +445,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 
        /* Allocate space for each worker's BufferUsage; no need to initialize. */
        bufusage_space = shm_toc_allocate(pcxt->toc,
-                                                                         sizeof(BufferUsage) * pcxt->nworkers);
+                                                         mul_size(sizeof(BufferUsage), pcxt->nworkers));
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
        pei->buffer_usage = bufusage_space;
 
@@ -385,20 +453,50 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
        /*
-        * If instrumentation options were supplied, allocate space for the
-        * data.  It only gets partially initialized here; the rest happens
-        * during ExecParallelInitializeDSM.
+        * If instrumentation options were supplied, allocate space for the data.
+        * It only gets partially initialized here; the rest happens during
+        * ExecParallelInitializeDSM.
         */
        if (estate->es_instrument)
        {
+               Instrumentation *instrument;
+               int                     i;
+
                instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
                instrumentation->instrument_options = estate->es_instrument;
-               instrumentation->ps_ninstrument = e.nnodes;
+               instrumentation->instrument_offset = instrument_offset;
+               instrumentation->num_workers = nworkers;
+               instrumentation->num_plan_nodes = e.nnodes;
+               instrument = GetInstrumentationArray(instrumentation);
+               for (i = 0; i < nworkers * e.nnodes; ++i)
+                       InstrInit(&instrument[i], estate->es_instrument);
                shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
                                           instrumentation);
                pei->instrumentation = instrumentation;
        }
 
+       /*
+        * Create a DSA area that can be used by the leader and all workers.
+        * (However, if we failed to create a DSM and are using private memory
+        * instead, then skip this.)
+        */
+       if (pcxt->seg != NULL)
+       {
+               char       *area_space;
+
+               area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
+               pei->area = dsa_create_in_place(area_space, dsa_minsize,
+                                                                               LWTRANCHE_PARALLEL_QUERY_DSA,
+                                                                               pcxt->seg);
+       }
+
+       /*
+        * Make the area available to executor nodes running in the leader.  See
+        * also ParallelQueryMain which makes it available to workers.
+        */
+       estate->es_query_dsa = pei->area;
+
        /*
         * Give parallel-aware nodes a chance to initialize their shared data.
         * This also initializes the elements of instrumentation->ps_instrument,
@@ -421,27 +519,48 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 }
 
 /*
- * Copy instrumentation information about this node and its descendents from
+ * Copy instrumentation information about this node and its descendants from
  * dynamic shared memory.
  */
 static bool
 ExecParallelRetrieveInstrumentation(PlanState *planstate,
-                                                 SharedExecutorInstrumentation *instrumentation)
+                                                         SharedExecutorInstrumentation *instrumentation)
 {
-       int             i;
-       int             plan_node_id = planstate->plan->plan_node_id;
-       SharedPlanStateInstrumentation *ps_instrument;
+       Instrumentation *instrument;
+       int                     i;
+       int                     n;
+       int                     ibytes;
+       int                     plan_node_id = planstate->plan->plan_node_id;
+       MemoryContext oldcontext;
 
        /* Find the instumentation for this node. */
-       for (i = 0; i < instrumentation->ps_ninstrument; ++i)
-               if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+       for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+               if (instrumentation->plan_node_id[i] == plan_node_id)
                        break;
-       if (i >= instrumentation->ps_ninstrument)
+       if (i >= instrumentation->num_plan_nodes)
                elog(ERROR, "plan node %d not found", plan_node_id);
 
-       /* No need to acquire the spinlock here; workers have exited already. */
-       ps_instrument = &instrumentation->ps_instrument[i];
-       InstrAggNode(planstate->instrument, &ps_instrument->instr);
+       /* Accumulate the statistics from all workers. */
+       instrument = GetInstrumentationArray(instrumentation);
+       instrument += i * instrumentation->num_workers;
+       for (n = 0; n < instrumentation->num_workers; ++n)
+               InstrAggNode(planstate->instrument, &instrument[n]);
+
+       /*
+        * Also store the per-worker detail.
+        *
+        * Worker instrumentation should be allocated in the same context as
+        * the regular instrumentation information, which is the per-query
+        * context. Switch into per-query memory context.
+        */
+       oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
+       ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
+       planstate->worker_instrument =
+               palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
+       MemoryContextSwitchTo(oldcontext);
+
+       planstate->worker_instrument->num_workers = instrumentation->num_workers;
+       memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
 
        return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
                                                                 instrumentation);
@@ -454,23 +573,28 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
 {
-       int             i;
+       int                     i;
+
+       if (pei->finished)
+               return;
 
        /* First, wait for the workers to finish. */
        WaitForParallelWorkersToFinish(pei->pcxt);
 
        /* Next, accumulate buffer usage. */
-       for (i = 0; i < pei->pcxt->nworkers; ++i)
+       for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
                InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
        /* Finally, accumulate instrumentation, if any. */
        if (pei->instrumentation)
                ExecParallelRetrieveInstrumentation(pei->planstate,
                                                                                        pei->instrumentation);
+
+       pei->finished = true;
 }
 
 /*
- * Clean up whatever ParallelExecutreInfo resources still exist after
+ * Clean up whatever ParallelExecutorInfo resources still exist after
  * ExecParallelFinish.  We separate these routines because someone might
  * want to examine the contents of the DSM after ExecParallelFinish and
  * before calling this routine.
@@ -478,6 +602,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 void
 ExecParallelCleanup(ParallelExecutorInfo *pei)
 {
+       if (pei->area != NULL)
+       {
+               dsa_detach(pei->area);
+               pei->area = NULL;
+       }
        if (pei->pcxt != NULL)
        {
                DestroyParallelContext(pei->pcxt);
@@ -538,49 +667,88 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
 }
 
 /*
- * Copy instrumentation information from this node and its descendents into
+ * Copy instrumentation information from this node and its descendants into
  * dynamic shared memory, so that the parallel leader can retrieve it.
  */
 static bool
 ExecParallelReportInstrumentation(PlanState *planstate,
-                                                 SharedExecutorInstrumentation *instrumentation)
+                                                         SharedExecutorInstrumentation *instrumentation)
 {
-       int             i;
-       int             plan_node_id = planstate->plan->plan_node_id;
-       SharedPlanStateInstrumentation *ps_instrument;
+       int                     i;
+       int                     plan_node_id = planstate->plan->plan_node_id;
+       Instrumentation *instrument;
+
+       InstrEndLoop(planstate->instrument);
 
        /*
         * If we shuffled the plan_node_id values in ps_instrument into sorted
-        * order, we could use binary search here.  This might matter someday
-        * if we're pushing down sufficiently large plan trees.  For now, do it
-        * the slow, dumb way.
+        * order, we could use binary search here.  This might matter someday if
+        * we're pushing down sufficiently large plan trees.  For now, do it the
+        * slow, dumb way.
         */
-       for (i = 0; i < instrumentation->ps_ninstrument; ++i)
-               if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+       for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+               if (instrumentation->plan_node_id[i] == plan_node_id)
                        break;
-       if (i >= instrumentation->ps_ninstrument)
+       if (i >= instrumentation->num_plan_nodes)
                elog(ERROR, "plan node %d not found", plan_node_id);
 
        /*
-        * There's one SharedPlanStateInstrumentation per plan_node_id, so we
-        * must use a spinlock in case multiple workers report at the same time.
+        * Add our statistics to the per-node, per-worker totals.  It's possible
+        * that this could happen more than once if we relaunched workers.
         */
-       ps_instrument = &instrumentation->ps_instrument[i];
-       SpinLockAcquire(&ps_instrument->mutex);
-       InstrAggNode(&ps_instrument->instr, planstate->instrument);
-       SpinLockRelease(&ps_instrument->mutex);
+       instrument = GetInstrumentationArray(instrumentation);
+       instrument += i * instrumentation->num_workers;
+       Assert(IsParallelWorker());
+       Assert(ParallelWorkerNumber < instrumentation->num_workers);
+       InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
 
        return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
                                                                 instrumentation);
 }
 
+/*
+ * Initialize the PlanState and its descendants with the information
+ * retrieved from shared memory.  This has to be done once the PlanState
+ * is allocated and initialized by executor; that is, after ExecutorStart().
+ */
+static bool
+ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
+{
+       if (planstate == NULL)
+               return false;
+
+       /* Call initializers for parallel-aware plan nodes. */
+       if (planstate->plan->parallel_aware)
+       {
+               switch (nodeTag(planstate))
+               {
+                       case T_SeqScanState:
+                               ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
+                               break;
+                       case T_ForeignScanState:
+                               ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
+                                                                                               toc);
+                               break;
+                       case T_CustomScanState:
+                               ExecCustomScanInitializeWorker((CustomScanState *) planstate,
+                                                                                          toc);
+                               break;
+                       default:
+                               break;
+               }
+       }
+
+       return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
+}
+
 /*
  * Main entrypoint for parallel query worker processes.
  *
- * We reach this function from ParallelMain, so the setup necessary to create
- * a sensible parallel environment has already been done; ParallelMain worries
- * about stuff like the transaction state, combo CID mappings, and GUC values,
- * so we don't need to deal with any of that here.
+ * We reach this function from ParallelWorkerMain, so the setup necessary to
+ * create a sensible parallel environment has already been done;
+ * ParallelWorkerMain worries about stuff like the transaction state, combo
+ * CID mappings, and GUC values, so we don't need to deal with any of that
+ * here.
  *
  * Our job is to deal with concerns specific to the executor.  The parallel
  * group leader will have stored a serialized PlannedStmt, and it's our job
@@ -597,6 +765,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        QueryDesc  *queryDesc;
        SharedExecutorInstrumentation *instrumentation;
        int                     instrument_options = 0;
+       void       *area_space;
+       dsa_area   *area;
 
        /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
        receiver = ExecParallelGetReceiver(seg, toc);
@@ -608,9 +778,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        /* Prepare to track buffer usage during query execution. */
        InstrStartParallelQuery();
 
-       /* Start up the executor, have it run the plan, and then shut it down. */
+       /* Attach to the dynamic shared memory area. */
+       area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA);
+       area = dsa_attach_in_place(area_space, seg);
+
+       /* Start up the executor */
        ExecutorStart(queryDesc, 0);
+
+       /* Special executor initialization steps for parallel workers */
+       queryDesc->planstate->state->es_query_dsa = area;
+       ExecParallelInitializeWorker(queryDesc->planstate, toc);
+
+       /* Run the plan */
        ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+
+       /* Shut down the executor */
        ExecutorFinish(queryDesc);
 
        /* Report buffer usage during parallel execution. */
@@ -626,6 +808,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        ExecutorEnd(queryDesc);
 
        /* Cleanup. */
+       dsa_detach(area);
        FreeQueryDesc(queryDesc);
        (*receiver->rDestroy) (receiver);
 }