]> granicus.if.org Git - postgresql/blob - src/backend/executor/execParallel.c
Fix possible crash due to incorrect allocation context.
[postgresql] / src / backend / executor / execParallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *        Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *        src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23
24 #include "postgres.h"
25
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeCustom.h"
29 #include "executor/nodeForeignscan.h"
30 #include "executor/nodeSeqscan.h"
31 #include "executor/tqueue.h"
32 #include "nodes/nodeFuncs.h"
33 #include "optimizer/planmain.h"
34 #include "optimizer/planner.h"
35 #include "storage/spin.h"
36 #include "tcop/tcopprot.h"
37 #include "utils/memutils.h"
38 #include "utils/snapmgr.h"
39
40 /*
41  * Magic numbers for parallel executor communication.  We use constants
42  * greater than any 32-bit integer here so that values < 2^32 can be used
43  * by individual parallel nodes to store their own state.
44  */
45 #define PARALLEL_KEY_PLANNEDSTMT                UINT64CONST(0xE000000000000001)
46 #define PARALLEL_KEY_PARAMS                             UINT64CONST(0xE000000000000002)
47 #define PARALLEL_KEY_BUFFER_USAGE               UINT64CONST(0xE000000000000003)
48 #define PARALLEL_KEY_TUPLE_QUEUE                UINT64CONST(0xE000000000000004)
49 #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000005)
50
51 #define PARALLEL_TUPLE_QUEUE_SIZE               65536
52
53 /*
54  * DSM structure for accumulating per-PlanState instrumentation.
55  *
56  * instrument_options: Same meaning here as in instrument.c.
57  *
58  * instrument_offset: Offset, relative to the start of this structure,
59  * of the first Instrumentation object.  This will depend on the length of
60  * the plan_node_id array.
61  *
62  * num_workers: Number of workers.
63  *
64  * num_plan_nodes: Number of plan nodes.
65  *
66  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
67  * from parallel workers.  The length of this array is given by num_plan_nodes.
68  */
69 struct SharedExecutorInstrumentation
70 {
71         int                     instrument_options;
72         int                     instrument_offset;
73         int                     num_workers;
74         int                     num_plan_nodes;
75         int                     plan_node_id[FLEXIBLE_ARRAY_MEMBER];
76         /* array of num_plan_nodes * num_workers Instrumentation objects follows */
77 };
78 #define GetInstrumentationArray(sei) \
79         (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
80          (Instrumentation *) (((char *) sei) + sei->instrument_offset))
81
82 /* Context object for ExecParallelEstimate. */
83 typedef struct ExecParallelEstimateContext
84 {
85         ParallelContext *pcxt;
86         int                     nnodes;
87 } ExecParallelEstimateContext;
88
89 /* Context object for ExecParallelInitializeDSM. */
90 typedef struct ExecParallelInitializeDSMContext
91 {
92         ParallelContext *pcxt;
93         SharedExecutorInstrumentation *instrumentation;
94         int                     nnodes;
95 } ExecParallelInitializeDSMContext;
96
97 /* Helper functions that run in the parallel leader. */
98 static char *ExecSerializePlan(Plan *plan, EState *estate);
99 static bool ExecParallelEstimate(PlanState *node,
100                                          ExecParallelEstimateContext *e);
101 static bool ExecParallelInitializeDSM(PlanState *node,
102                                                   ExecParallelInitializeDSMContext *d);
103 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
104                                                          bool reinitialize);
105 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
106                                                          SharedExecutorInstrumentation *instrumentation);
107
108 /* Helper functions that run in the parallel worker. */
109 static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
110 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
111
112 /*
113  * Create a serialized representation of the plan to be sent to each worker.
114  */
115 static char *
116 ExecSerializePlan(Plan *plan, EState *estate)
117 {
118         PlannedStmt *pstmt;
119         ListCell   *tlist;
120
121         /* We can't scribble on the original plan, so make a copy. */
122         plan = copyObject(plan);
123
124         /*
125          * The worker will start its own copy of the executor, and that copy will
126          * insert a junk filter if the toplevel node has any resjunk entries. We
127          * don't want that to happen, because while resjunk columns shouldn't be
128          * sent back to the user, here the tuples are coming back to another
129          * backend which may very well need them.  So mutate the target list
130          * accordingly.  This is sort of a hack; there might be better ways to do
131          * this...
132          */
133         foreach(tlist, plan->targetlist)
134         {
135                 TargetEntry *tle = (TargetEntry *) lfirst(tlist);
136
137                 tle->resjunk = false;
138         }
139
140         /*
141          * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
142          * for our purposes, but the worker will need at least a minimal
143          * PlannedStmt to start the executor.
144          */
145         pstmt = makeNode(PlannedStmt);
146         pstmt->commandType = CMD_SELECT;
147         pstmt->queryId = 0;
148         pstmt->hasReturning = false;
149         pstmt->hasModifyingCTE = false;
150         pstmt->canSetTag = true;
151         pstmt->transientPlan = false;
152         pstmt->dependsOnRole = false;
153         pstmt->parallelModeNeeded = false;
154         pstmt->planTree = plan;
155         pstmt->rtable = estate->es_range_table;
156         pstmt->resultRelations = NIL;
157         pstmt->utilityStmt = NULL;
158         pstmt->subplans = NIL;
159         pstmt->rewindPlanIDs = NULL;
160         pstmt->rowMarks = NIL;
161         pstmt->relationOids = NIL;
162         pstmt->invalItems = NIL;        /* workers can't replan anyway... */
163         pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
164
165         /* Return serialized copy of our dummy PlannedStmt. */
166         return nodeToString(pstmt);
167 }
168
169 /*
170  * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
171  * may need some state which is shared across all parallel workers.  Before
172  * we size the DSM, give them a chance to call shm_toc_estimate_chunk or
173  * shm_toc_estimate_keys on &pcxt->estimator.
174  *
175  * While we're at it, count the number of PlanState nodes in the tree, so
176  * we know how many SharedPlanStateInstrumentation structures we need.
177  */
178 static bool
179 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
180 {
181         if (planstate == NULL)
182                 return false;
183
184         /* Count this node. */
185         e->nnodes++;
186
187         /* Call estimators for parallel-aware nodes. */
188         if (planstate->plan->parallel_aware)
189         {
190                 switch (nodeTag(planstate))
191                 {
192                         case T_SeqScanState:
193                                 ExecSeqScanEstimate((SeqScanState *) planstate,
194                                                                         e->pcxt);
195                                 break;
196                         case T_ForeignScanState:
197                                 ExecForeignScanEstimate((ForeignScanState *) planstate,
198                                                                                 e->pcxt);
199                                 break;
200                         case T_CustomScanState:
201                                 ExecCustomScanEstimate((CustomScanState *) planstate,
202                                                                            e->pcxt);
203                                 break;
204                         default:
205                                 break;
206                 }
207         }
208
209         return planstate_tree_walker(planstate, ExecParallelEstimate, e);
210 }
211
212 /*
213  * Initialize the dynamic shared memory segment that will be used to control
214  * parallel execution.
215  */
216 static bool
217 ExecParallelInitializeDSM(PlanState *planstate,
218                                                   ExecParallelInitializeDSMContext *d)
219 {
220         if (planstate == NULL)
221                 return false;
222
223         /* If instrumentation is enabled, initialize slot for this node. */
224         if (d->instrumentation != NULL)
225                 d->instrumentation->plan_node_id[d->nnodes] =
226                         planstate->plan->plan_node_id;
227
228         /* Count this node. */
229         d->nnodes++;
230
231         /*
232          * Call initializers for parallel-aware plan nodes.
233          *
234          * Ordinary plan nodes won't do anything here, but parallel-aware plan
235          * nodes may need to initialize shared state in the DSM before parallel
236          * workers are available.  They can allocate the space they previously
237          * estimated using shm_toc_allocate, and add the keys they previously
238          * estimated using shm_toc_insert, in each case targeting pcxt->toc.
239          */
240         if (planstate->plan->parallel_aware)
241         {
242                 switch (nodeTag(planstate))
243                 {
244                         case T_SeqScanState:
245                                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
246                                                                                  d->pcxt);
247                                 break;
248                         case T_ForeignScanState:
249                                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
250                                                                                          d->pcxt);
251                                 break;
252                         case T_CustomScanState:
253                                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
254                                                                                         d->pcxt);
255                                 break;
256                         default:
257                                 break;
258                 }
259         }
260
261         return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
262 }
263
264 /*
265  * It sets up the response queues for backend workers to return tuples
266  * to the main backend and start the workers.
267  */
268 static shm_mq_handle **
269 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
270 {
271         shm_mq_handle **responseq;
272         char       *tqueuespace;
273         int                     i;
274
275         /* Skip this if no workers. */
276         if (pcxt->nworkers == 0)
277                 return NULL;
278
279         /* Allocate memory for shared memory queue handles. */
280         responseq = (shm_mq_handle **)
281                 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
282
283         /*
284          * If not reinitializing, allocate space from the DSM for the queues;
285          * otherwise, find the already allocated space.
286          */
287         if (!reinitialize)
288                 tqueuespace =
289                         shm_toc_allocate(pcxt->toc,
290                                                          mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
291                                                                           pcxt->nworkers));
292         else
293                 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
294
295         /* Create the queues, and become the receiver for each. */
296         for (i = 0; i < pcxt->nworkers; ++i)
297         {
298                 shm_mq     *mq;
299
300                 mq = shm_mq_create(tqueuespace +
301                                                    ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
302                                                    (Size) PARALLEL_TUPLE_QUEUE_SIZE);
303
304                 shm_mq_set_receiver(mq, MyProc);
305                 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
306         }
307
308         /* Add array of queues to shm_toc, so others can find it. */
309         if (!reinitialize)
310                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
311
312         /* Return array of handles. */
313         return responseq;
314 }
315
316 /*
317  * Re-initialize the parallel executor info such that it can be reused by
318  * workers.
319  */
320 void
321 ExecParallelReinitialize(ParallelExecutorInfo *pei)
322 {
323         ReinitializeParallelDSM(pei->pcxt);
324         pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
325         pei->finished = false;
326 }
327
328 /*
329  * Sets up the required infrastructure for backend workers to perform
330  * execution and return results to the main backend.
331  */
332 ParallelExecutorInfo *
333 ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
334 {
335         ParallelExecutorInfo *pei;
336         ParallelContext *pcxt;
337         ExecParallelEstimateContext e;
338         ExecParallelInitializeDSMContext d;
339         char       *pstmt_data;
340         char       *pstmt_space;
341         char       *param_space;
342         BufferUsage *bufusage_space;
343         SharedExecutorInstrumentation *instrumentation = NULL;
344         int                     pstmt_len;
345         int                     param_len;
346         int                     instrumentation_len = 0;
347         int                     instrument_offset = 0;
348
349         /* Allocate object for return value. */
350         pei = palloc0(sizeof(ParallelExecutorInfo));
351         pei->finished = false;
352         pei->planstate = planstate;
353
354         /* Fix up and serialize plan to be sent to workers. */
355         pstmt_data = ExecSerializePlan(planstate->plan, estate);
356
357         /* Create a parallel context. */
358         pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
359         pei->pcxt = pcxt;
360
361         /*
362          * Before telling the parallel context to create a dynamic shared memory
363          * segment, we need to figure out how big it should be.  Estimate space
364          * for the various things we need to store.
365          */
366
367         /* Estimate space for serialized PlannedStmt. */
368         pstmt_len = strlen(pstmt_data) + 1;
369         shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
370         shm_toc_estimate_keys(&pcxt->estimator, 1);
371
372         /* Estimate space for serialized ParamListInfo. */
373         param_len = EstimateParamListSpace(estate->es_param_list_info);
374         shm_toc_estimate_chunk(&pcxt->estimator, param_len);
375         shm_toc_estimate_keys(&pcxt->estimator, 1);
376
377         /*
378          * Estimate space for BufferUsage.
379          *
380          * If EXPLAIN is not in use and there are no extensions loaded that care,
381          * we could skip this.  But we have no way of knowing whether anyone's
382          * looking at pgBufferUsage, so do it unconditionally.
383          */
384         shm_toc_estimate_chunk(&pcxt->estimator,
385                                                    mul_size(sizeof(BufferUsage), pcxt->nworkers));
386         shm_toc_estimate_keys(&pcxt->estimator, 1);
387
388         /* Estimate space for tuple queues. */
389         shm_toc_estimate_chunk(&pcxt->estimator,
390                                                 mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
391         shm_toc_estimate_keys(&pcxt->estimator, 1);
392
393         /*
394          * Give parallel-aware nodes a chance to add to the estimates, and get a
395          * count of how many PlanState nodes there are.
396          */
397         e.pcxt = pcxt;
398         e.nnodes = 0;
399         ExecParallelEstimate(planstate, &e);
400
401         /* Estimate space for instrumentation, if required. */
402         if (estate->es_instrument)
403         {
404                 instrumentation_len =
405                         offsetof(SharedExecutorInstrumentation, plan_node_id) +
406                         sizeof(int) * e.nnodes;
407                 instrumentation_len = MAXALIGN(instrumentation_len);
408                 instrument_offset = instrumentation_len;
409                 instrumentation_len +=
410                         mul_size(sizeof(Instrumentation),
411                                          mul_size(e.nnodes, nworkers));
412                 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
413                 shm_toc_estimate_keys(&pcxt->estimator, 1);
414         }
415
416         /* Everyone's had a chance to ask for space, so now create the DSM. */
417         InitializeParallelDSM(pcxt);
418
419         /*
420          * OK, now we have a dynamic shared memory segment, and it should be big
421          * enough to store all of the data we estimated we would want to put into
422          * it, plus whatever general stuff (not specifically executor-related) the
423          * ParallelContext itself needs to store there.  None of the space we
424          * asked for has been allocated or initialized yet, though, so do that.
425          */
426
427         /* Store serialized PlannedStmt. */
428         pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
429         memcpy(pstmt_space, pstmt_data, pstmt_len);
430         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
431
432         /* Store serialized ParamListInfo. */
433         param_space = shm_toc_allocate(pcxt->toc, param_len);
434         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
435         SerializeParamList(estate->es_param_list_info, &param_space);
436
437         /* Allocate space for each worker's BufferUsage; no need to initialize. */
438         bufusage_space = shm_toc_allocate(pcxt->toc,
439                                                           mul_size(sizeof(BufferUsage), pcxt->nworkers));
440         shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
441         pei->buffer_usage = bufusage_space;
442
443         /* Set up tuple queues. */
444         pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
445
446         /*
447          * If instrumentation options were supplied, allocate space for the data.
448          * It only gets partially initialized here; the rest happens during
449          * ExecParallelInitializeDSM.
450          */
451         if (estate->es_instrument)
452         {
453                 Instrumentation *instrument;
454                 int                     i;
455
456                 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
457                 instrumentation->instrument_options = estate->es_instrument;
458                 instrumentation->instrument_offset = instrument_offset;
459                 instrumentation->num_workers = nworkers;
460                 instrumentation->num_plan_nodes = e.nnodes;
461                 instrument = GetInstrumentationArray(instrumentation);
462                 for (i = 0; i < nworkers * e.nnodes; ++i)
463                         InstrInit(&instrument[i], estate->es_instrument);
464                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
465                                            instrumentation);
466                 pei->instrumentation = instrumentation;
467         }
468
469         /*
470          * Give parallel-aware nodes a chance to initialize their shared data.
471          * This also initializes the elements of instrumentation->ps_instrument,
472          * if it exists.
473          */
474         d.pcxt = pcxt;
475         d.instrumentation = instrumentation;
476         d.nnodes = 0;
477         ExecParallelInitializeDSM(planstate, &d);
478
479         /*
480          * Make sure that the world hasn't shifted under our feat.  This could
481          * probably just be an Assert(), but let's be conservative for now.
482          */
483         if (e.nnodes != d.nnodes)
484                 elog(ERROR, "inconsistent count of PlanState nodes");
485
486         /* OK, we're ready to rock and roll. */
487         return pei;
488 }
489
490 /*
491  * Copy instrumentation information about this node and its descendents from
492  * dynamic shared memory.
493  */
494 static bool
495 ExecParallelRetrieveInstrumentation(PlanState *planstate,
496                                                           SharedExecutorInstrumentation *instrumentation)
497 {
498         Instrumentation *instrument;
499         int                     i;
500         int                     n;
501         int                     ibytes;
502         int                     plan_node_id = planstate->plan->plan_node_id;
503         MemoryContext oldcontext;
504
505         /* Find the instumentation for this node. */
506         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
507                 if (instrumentation->plan_node_id[i] == plan_node_id)
508                         break;
509         if (i >= instrumentation->num_plan_nodes)
510                 elog(ERROR, "plan node %d not found", plan_node_id);
511
512         /* Accumulate the statistics from all workers. */
513         instrument = GetInstrumentationArray(instrumentation);
514         instrument += i * instrumentation->num_workers;
515         for (n = 0; n < instrumentation->num_workers; ++n)
516                 InstrAggNode(planstate->instrument, &instrument[n]);
517
518         /*
519          * Also store the per-worker detail.
520          *
521          * Worker instrumentation should be allocated in the same context as
522          * the regular instrumentation information, which is the per-query
523          * context. Switch into per-query memory context.
524          */
525         oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
526         ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
527         planstate->worker_instrument =
528                 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
529         MemoryContextSwitchTo(oldcontext);
530
531         planstate->worker_instrument->num_workers = instrumentation->num_workers;
532         memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
533
534         return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
535                                                                  instrumentation);
536 }
537
538 /*
539  * Finish parallel execution.  We wait for parallel workers to finish, and
540  * accumulate their buffer usage and instrumentation.
541  */
542 void
543 ExecParallelFinish(ParallelExecutorInfo *pei)
544 {
545         int                     i;
546
547         if (pei->finished)
548                 return;
549
550         /* First, wait for the workers to finish. */
551         WaitForParallelWorkersToFinish(pei->pcxt);
552
553         /* Next, accumulate buffer usage. */
554         for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
555                 InstrAccumParallelQuery(&pei->buffer_usage[i]);
556
557         /* Finally, accumulate instrumentation, if any. */
558         if (pei->instrumentation)
559                 ExecParallelRetrieveInstrumentation(pei->planstate,
560                                                                                         pei->instrumentation);
561
562         pei->finished = true;
563 }
564
565 /*
566  * Clean up whatever ParallelExecutreInfo resources still exist after
567  * ExecParallelFinish.  We separate these routines because someone might
568  * want to examine the contents of the DSM after ExecParallelFinish and
569  * before calling this routine.
570  */
571 void
572 ExecParallelCleanup(ParallelExecutorInfo *pei)
573 {
574         if (pei->pcxt != NULL)
575         {
576                 DestroyParallelContext(pei->pcxt);
577                 pei->pcxt = NULL;
578         }
579         pfree(pei);
580 }
581
582 /*
583  * Create a DestReceiver to write tuples we produce to the shm_mq designated
584  * for that purpose.
585  */
586 static DestReceiver *
587 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
588 {
589         char       *mqspace;
590         shm_mq     *mq;
591
592         mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE);
593         mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
594         mq = (shm_mq *) mqspace;
595         shm_mq_set_sender(mq, MyProc);
596         return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
597 }
598
599 /*
600  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
601  */
602 static QueryDesc *
603 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
604                                                  int instrument_options)
605 {
606         char       *pstmtspace;
607         char       *paramspace;
608         PlannedStmt *pstmt;
609         ParamListInfo paramLI;
610
611         /* Reconstruct leader-supplied PlannedStmt. */
612         pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
613         pstmt = (PlannedStmt *) stringToNode(pstmtspace);
614
615         /* Reconstruct ParamListInfo. */
616         paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS);
617         paramLI = RestoreParamList(&paramspace);
618
619         /*
620          * Create a QueryDesc for the query.
621          *
622          * It's not obvious how to obtain the query string from here; and even if
623          * we could copying it would take more cycles than not copying it. But
624          * it's a bit unsatisfying to just use a dummy string here, so consider
625          * revising this someday.
626          */
627         return CreateQueryDesc(pstmt,
628                                                    "<parallel query>",
629                                                    GetActiveSnapshot(), InvalidSnapshot,
630                                                    receiver, paramLI, instrument_options);
631 }
632
633 /*
634  * Copy instrumentation information from this node and its descendents into
635  * dynamic shared memory, so that the parallel leader can retrieve it.
636  */
637 static bool
638 ExecParallelReportInstrumentation(PlanState *planstate,
639                                                           SharedExecutorInstrumentation *instrumentation)
640 {
641         int                     i;
642         int                     plan_node_id = planstate->plan->plan_node_id;
643         Instrumentation *instrument;
644
645         InstrEndLoop(planstate->instrument);
646
647         /*
648          * If we shuffled the plan_node_id values in ps_instrument into sorted
649          * order, we could use binary search here.  This might matter someday if
650          * we're pushing down sufficiently large plan trees.  For now, do it the
651          * slow, dumb way.
652          */
653         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
654                 if (instrumentation->plan_node_id[i] == plan_node_id)
655                         break;
656         if (i >= instrumentation->num_plan_nodes)
657                 elog(ERROR, "plan node %d not found", plan_node_id);
658
659         /*
660          * Add our statistics to the per-node, per-worker totals.  It's possible
661          * that this could happen more than once if we relaunched workers.
662          */
663         instrument = GetInstrumentationArray(instrumentation);
664         instrument += i * instrumentation->num_workers;
665         Assert(IsParallelWorker());
666         Assert(ParallelWorkerNumber < instrumentation->num_workers);
667         InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
668
669         return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
670                                                                  instrumentation);
671 }
672
673 /*
674  * Initialize the PlanState and its descendents with the information
675  * retrieved from shared memory.  This has to be done once the PlanState
676  * is allocated and initialized by executor; that is, after ExecutorStart().
677  */
678 static bool
679 ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
680 {
681         if (planstate == NULL)
682                 return false;
683
684         /* Call initializers for parallel-aware plan nodes. */
685         if (planstate->plan->parallel_aware)
686         {
687                 switch (nodeTag(planstate))
688                 {
689                         case T_SeqScanState:
690                                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
691                                 break;
692                         case T_ForeignScanState:
693                                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
694                                                                                                 toc);
695                                 break;
696                         case T_CustomScanState:
697                                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
698                                                                                            toc);
699                                 break;
700                         default:
701                                 break;
702                 }
703         }
704
705         return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
706 }
707
708 /*
709  * Main entrypoint for parallel query worker processes.
710  *
711  * We reach this function from ParallelMain, so the setup necessary to create
712  * a sensible parallel environment has already been done; ParallelMain worries
713  * about stuff like the transaction state, combo CID mappings, and GUC values,
714  * so we don't need to deal with any of that here.
715  *
716  * Our job is to deal with concerns specific to the executor.  The parallel
717  * group leader will have stored a serialized PlannedStmt, and it's our job
718  * to execute that plan and write the resulting tuples to the appropriate
719  * tuple queue.  Various bits of supporting information that we need in order
720  * to do this are also stored in the dsm_segment and can be accessed through
721  * the shm_toc.
722  */
723 static void
724 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
725 {
726         BufferUsage *buffer_usage;
727         DestReceiver *receiver;
728         QueryDesc  *queryDesc;
729         SharedExecutorInstrumentation *instrumentation;
730         int                     instrument_options = 0;
731
732         /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
733         receiver = ExecParallelGetReceiver(seg, toc);
734         instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION);
735         if (instrumentation != NULL)
736                 instrument_options = instrumentation->instrument_options;
737         queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
738
739         /* Prepare to track buffer usage during query execution. */
740         InstrStartParallelQuery();
741
742         /* Start up the executor, have it run the plan, and then shut it down. */
743         ExecutorStart(queryDesc, 0);
744         ExecParallelInitializeWorker(queryDesc->planstate, toc);
745         ExecutorRun(queryDesc, ForwardScanDirection, 0L);
746         ExecutorFinish(queryDesc);
747
748         /* Report buffer usage during parallel execution. */
749         buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
750         InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
751
752         /* Report instrumentation data if any instrumentation options are set. */
753         if (instrumentation != NULL)
754                 ExecParallelReportInstrumentation(queryDesc->planstate,
755                                                                                   instrumentation);
756
757         /* Must do this after capturing instrumentation. */
758         ExecutorEnd(queryDesc);
759
760         /* Cleanup. */
761         FreeQueryDesc(queryDesc);
762         (*receiver->rDestroy) (receiver);
763 }