]> granicus.if.org Git - postgresql/blob - src/backend/executor/execParallel.c
pgindent run for 9.6
[postgresql] / src / backend / executor / execParallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *        Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *        src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23
24 #include "postgres.h"
25
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeCustom.h"
29 #include "executor/nodeForeignscan.h"
30 #include "executor/nodeSeqscan.h"
31 #include "executor/tqueue.h"
32 #include "nodes/nodeFuncs.h"
33 #include "optimizer/planmain.h"
34 #include "optimizer/planner.h"
35 #include "storage/spin.h"
36 #include "tcop/tcopprot.h"
37 #include "utils/memutils.h"
38 #include "utils/snapmgr.h"
39
40 /*
41  * Magic numbers for parallel executor communication.  We use constants
42  * greater than any 32-bit integer here so that values < 2^32 can be used
43  * by individual parallel nodes to store their own state.
44  */
45 #define PARALLEL_KEY_PLANNEDSTMT                UINT64CONST(0xE000000000000001)
46 #define PARALLEL_KEY_PARAMS                             UINT64CONST(0xE000000000000002)
47 #define PARALLEL_KEY_BUFFER_USAGE               UINT64CONST(0xE000000000000003)
48 #define PARALLEL_KEY_TUPLE_QUEUE                UINT64CONST(0xE000000000000004)
49 #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000005)
50
51 #define PARALLEL_TUPLE_QUEUE_SIZE               65536
52
53 /*
54  * DSM structure for accumulating per-PlanState instrumentation.
55  *
56  * instrument_options: Same meaning here as in instrument.c.
57  *
58  * instrument_offset: Offset, relative to the start of this structure,
59  * of the first Instrumentation object.  This will depend on the length of
60  * the plan_node_id array.
61  *
62  * num_workers: Number of workers.
63  *
64  * num_plan_nodes: Number of plan nodes.
65  *
66  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
67  * from parallel workers.  The length of this array is given by num_plan_nodes.
68  */
69 struct SharedExecutorInstrumentation
70 {
71         int                     instrument_options;
72         int                     instrument_offset;
73         int                     num_workers;
74         int                     num_plan_nodes;
75         int                     plan_node_id[FLEXIBLE_ARRAY_MEMBER];
76         /* array of num_plan_nodes * num_workers Instrumentation objects follows */
77 };
78 #define GetInstrumentationArray(sei) \
79         (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
80          (Instrumentation *) (((char *) sei) + sei->instrument_offset))
81
82 /* Context object for ExecParallelEstimate. */
83 typedef struct ExecParallelEstimateContext
84 {
85         ParallelContext *pcxt;
86         int                     nnodes;
87 } ExecParallelEstimateContext;
88
89 /* Context object for ExecParallelInitializeDSM. */
90 typedef struct ExecParallelInitializeDSMContext
91 {
92         ParallelContext *pcxt;
93         SharedExecutorInstrumentation *instrumentation;
94         int                     nnodes;
95 } ExecParallelInitializeDSMContext;
96
97 /* Helper functions that run in the parallel leader. */
98 static char *ExecSerializePlan(Plan *plan, EState *estate);
99 static bool ExecParallelEstimate(PlanState *node,
100                                          ExecParallelEstimateContext *e);
101 static bool ExecParallelInitializeDSM(PlanState *node,
102                                                   ExecParallelInitializeDSMContext *d);
103 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
104                                                          bool reinitialize);
105 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
106                                                          SharedExecutorInstrumentation *instrumentation);
107
108 /* Helper functions that run in the parallel worker. */
109 static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
110 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
111
112 /*
113  * Create a serialized representation of the plan to be sent to each worker.
114  */
115 static char *
116 ExecSerializePlan(Plan *plan, EState *estate)
117 {
118         PlannedStmt *pstmt;
119         ListCell   *tlist;
120
121         /* We can't scribble on the original plan, so make a copy. */
122         plan = copyObject(plan);
123
124         /*
125          * The worker will start its own copy of the executor, and that copy will
126          * insert a junk filter if the toplevel node has any resjunk entries. We
127          * don't want that to happen, because while resjunk columns shouldn't be
128          * sent back to the user, here the tuples are coming back to another
129          * backend which may very well need them.  So mutate the target list
130          * accordingly.  This is sort of a hack; there might be better ways to do
131          * this...
132          */
133         foreach(tlist, plan->targetlist)
134         {
135                 TargetEntry *tle = (TargetEntry *) lfirst(tlist);
136
137                 tle->resjunk = false;
138         }
139
140         /*
141          * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
142          * for our purposes, but the worker will need at least a minimal
143          * PlannedStmt to start the executor.
144          */
145         pstmt = makeNode(PlannedStmt);
146         pstmt->commandType = CMD_SELECT;
147         pstmt->queryId = 0;
148         pstmt->hasReturning = 0;
149         pstmt->hasModifyingCTE = 0;
150         pstmt->canSetTag = 1;
151         pstmt->transientPlan = 0;
152         pstmt->planTree = plan;
153         pstmt->rtable = estate->es_range_table;
154         pstmt->resultRelations = NIL;
155         pstmt->utilityStmt = NULL;
156         pstmt->subplans = NIL;
157         pstmt->rewindPlanIDs = NULL;
158         pstmt->rowMarks = NIL;
159         pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
160         pstmt->relationOids = NIL;
161         pstmt->invalItems = NIL;        /* workers can't replan anyway... */
162         pstmt->hasRowSecurity = false;
163         pstmt->hasForeignJoin = false;
164
165         /* Return serialized copy of our dummy PlannedStmt. */
166         return nodeToString(pstmt);
167 }
168
169 /*
170  * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
171  * may need some state which is shared across all parallel workers.  Before
172  * we size the DSM, give them a chance to call shm_toc_estimate_chunk or
173  * shm_toc_estimate_keys on &pcxt->estimator.
174  *
175  * While we're at it, count the number of PlanState nodes in the tree, so
176  * we know how many SharedPlanStateInstrumentation structures we need.
177  */
178 static bool
179 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
180 {
181         if (planstate == NULL)
182                 return false;
183
184         /* Count this node. */
185         e->nnodes++;
186
187         /* Call estimators for parallel-aware nodes. */
188         if (planstate->plan->parallel_aware)
189         {
190                 switch (nodeTag(planstate))
191                 {
192                         case T_SeqScanState:
193                                 ExecSeqScanEstimate((SeqScanState *) planstate,
194                                                                         e->pcxt);
195                                 break;
196                         case T_ForeignScanState:
197                                 ExecForeignScanEstimate((ForeignScanState *) planstate,
198                                                                                 e->pcxt);
199                                 break;
200                         case T_CustomScanState:
201                                 ExecCustomScanEstimate((CustomScanState *) planstate,
202                                                                            e->pcxt);
203                                 break;
204                         default:
205                                 break;
206                 }
207         }
208
209         return planstate_tree_walker(planstate, ExecParallelEstimate, e);
210 }
211
212 /*
213  * Initialize the dynamic shared memory segment that will be used to control
214  * parallel execution.
215  */
216 static bool
217 ExecParallelInitializeDSM(PlanState *planstate,
218                                                   ExecParallelInitializeDSMContext *d)
219 {
220         if (planstate == NULL)
221                 return false;
222
223         /* If instrumentation is enabled, initialize slot for this node. */
224         if (d->instrumentation != NULL)
225                 d->instrumentation->plan_node_id[d->nnodes] =
226                         planstate->plan->plan_node_id;
227
228         /* Count this node. */
229         d->nnodes++;
230
231         /*
232          * Call initializers for parallel-aware plan nodes.
233          *
234          * Ordinary plan nodes won't do anything here, but parallel-aware plan
235          * nodes may need to initialize shared state in the DSM before parallel
236          * workers are available.  They can allocate the space they previously
237          * estimated using shm_toc_allocate, and add the keys they previously
238          * estimated using shm_toc_insert, in each case targeting pcxt->toc.
239          */
240         if (planstate->plan->parallel_aware)
241         {
242                 switch (nodeTag(planstate))
243                 {
244                         case T_SeqScanState:
245                                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
246                                                                                  d->pcxt);
247                                 break;
248                         case T_ForeignScanState:
249                                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
250                                                                                          d->pcxt);
251                                 break;
252                         case T_CustomScanState:
253                                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
254                                                                                         d->pcxt);
255                                 break;
256                         default:
257                                 break;
258                 }
259         }
260
261         return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
262 }
263
264 /*
265  * It sets up the response queues for backend workers to return tuples
266  * to the main backend and start the workers.
267  */
268 static shm_mq_handle **
269 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
270 {
271         shm_mq_handle **responseq;
272         char       *tqueuespace;
273         int                     i;
274
275         /* Skip this if no workers. */
276         if (pcxt->nworkers == 0)
277                 return NULL;
278
279         /* Allocate memory for shared memory queue handles. */
280         responseq = (shm_mq_handle **)
281                 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
282
283         /*
284          * If not reinitializing, allocate space from the DSM for the queues;
285          * otherwise, find the already allocated space.
286          */
287         if (!reinitialize)
288                 tqueuespace =
289                         shm_toc_allocate(pcxt->toc,
290                                                          mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
291                                                                           pcxt->nworkers));
292         else
293                 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
294
295         /* Create the queues, and become the receiver for each. */
296         for (i = 0; i < pcxt->nworkers; ++i)
297         {
298                 shm_mq     *mq;
299
300                 mq = shm_mq_create(tqueuespace +
301                                                    ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
302                                                    (Size) PARALLEL_TUPLE_QUEUE_SIZE);
303
304                 shm_mq_set_receiver(mq, MyProc);
305                 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
306         }
307
308         /* Add array of queues to shm_toc, so others can find it. */
309         if (!reinitialize)
310                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
311
312         /* Return array of handles. */
313         return responseq;
314 }
315
316 /*
317  * Re-initialize the parallel executor info such that it can be reused by
318  * workers.
319  */
320 void
321 ExecParallelReinitialize(ParallelExecutorInfo *pei)
322 {
323         ReinitializeParallelDSM(pei->pcxt);
324         pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
325         pei->finished = false;
326 }
327
328 /*
329  * Sets up the required infrastructure for backend workers to perform
330  * execution and return results to the main backend.
331  */
332 ParallelExecutorInfo *
333 ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
334 {
335         ParallelExecutorInfo *pei;
336         ParallelContext *pcxt;
337         ExecParallelEstimateContext e;
338         ExecParallelInitializeDSMContext d;
339         char       *pstmt_data;
340         char       *pstmt_space;
341         char       *param_space;
342         BufferUsage *bufusage_space;
343         SharedExecutorInstrumentation *instrumentation = NULL;
344         int                     pstmt_len;
345         int                     param_len;
346         int                     instrumentation_len = 0;
347         int                     instrument_offset = 0;
348
349         /* Allocate object for return value. */
350         pei = palloc0(sizeof(ParallelExecutorInfo));
351         pei->finished = false;
352         pei->planstate = planstate;
353
354         /* Fix up and serialize plan to be sent to workers. */
355         pstmt_data = ExecSerializePlan(planstate->plan, estate);
356
357         /* Create a parallel context. */
358         pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
359         pei->pcxt = pcxt;
360
361         /*
362          * Before telling the parallel context to create a dynamic shared memory
363          * segment, we need to figure out how big it should be.  Estimate space
364          * for the various things we need to store.
365          */
366
367         /* Estimate space for serialized PlannedStmt. */
368         pstmt_len = strlen(pstmt_data) + 1;
369         shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
370         shm_toc_estimate_keys(&pcxt->estimator, 1);
371
372         /* Estimate space for serialized ParamListInfo. */
373         param_len = EstimateParamListSpace(estate->es_param_list_info);
374         shm_toc_estimate_chunk(&pcxt->estimator, param_len);
375         shm_toc_estimate_keys(&pcxt->estimator, 1);
376
377         /*
378          * Estimate space for BufferUsage.
379          *
380          * If EXPLAIN is not in use and there are no extensions loaded that care,
381          * we could skip this.  But we have no way of knowing whether anyone's
382          * looking at pgBufferUsage, so do it unconditionally.
383          */
384         shm_toc_estimate_chunk(&pcxt->estimator,
385                                                    mul_size(sizeof(BufferUsage), pcxt->nworkers));
386         shm_toc_estimate_keys(&pcxt->estimator, 1);
387
388         /* Estimate space for tuple queues. */
389         shm_toc_estimate_chunk(&pcxt->estimator,
390                                                 mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
391         shm_toc_estimate_keys(&pcxt->estimator, 1);
392
393         /*
394          * Give parallel-aware nodes a chance to add to the estimates, and get a
395          * count of how many PlanState nodes there are.
396          */
397         e.pcxt = pcxt;
398         e.nnodes = 0;
399         ExecParallelEstimate(planstate, &e);
400
401         /* Estimate space for instrumentation, if required. */
402         if (estate->es_instrument)
403         {
404                 instrumentation_len =
405                         offsetof(SharedExecutorInstrumentation, plan_node_id) +
406                         sizeof(int) * e.nnodes;
407                 instrumentation_len = MAXALIGN(instrumentation_len);
408                 instrument_offset = instrumentation_len;
409                 instrumentation_len +=
410                         mul_size(sizeof(Instrumentation),
411                                          mul_size(e.nnodes, nworkers));
412                 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
413                 shm_toc_estimate_keys(&pcxt->estimator, 1);
414         }
415
416         /* Everyone's had a chance to ask for space, so now create the DSM. */
417         InitializeParallelDSM(pcxt);
418
419         /*
420          * OK, now we have a dynamic shared memory segment, and it should be big
421          * enough to store all of the data we estimated we would want to put into
422          * it, plus whatever general stuff (not specifically executor-related) the
423          * ParallelContext itself needs to store there.  None of the space we
424          * asked for has been allocated or initialized yet, though, so do that.
425          */
426
427         /* Store serialized PlannedStmt. */
428         pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
429         memcpy(pstmt_space, pstmt_data, pstmt_len);
430         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
431
432         /* Store serialized ParamListInfo. */
433         param_space = shm_toc_allocate(pcxt->toc, param_len);
434         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
435         SerializeParamList(estate->es_param_list_info, &param_space);
436
437         /* Allocate space for each worker's BufferUsage; no need to initialize. */
438         bufusage_space = shm_toc_allocate(pcxt->toc,
439                                                           mul_size(sizeof(BufferUsage), pcxt->nworkers));
440         shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
441         pei->buffer_usage = bufusage_space;
442
443         /* Set up tuple queues. */
444         pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
445
446         /*
447          * If instrumentation options were supplied, allocate space for the data.
448          * It only gets partially initialized here; the rest happens during
449          * ExecParallelInitializeDSM.
450          */
451         if (estate->es_instrument)
452         {
453                 Instrumentation *instrument;
454                 int                     i;
455
456                 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
457                 instrumentation->instrument_options = estate->es_instrument;
458                 instrumentation->instrument_offset = instrument_offset;
459                 instrumentation->num_workers = nworkers;
460                 instrumentation->num_plan_nodes = e.nnodes;
461                 instrument = GetInstrumentationArray(instrumentation);
462                 for (i = 0; i < nworkers * e.nnodes; ++i)
463                         InstrInit(&instrument[i], estate->es_instrument);
464                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
465                                            instrumentation);
466                 pei->instrumentation = instrumentation;
467         }
468
469         /*
470          * Give parallel-aware nodes a chance to initialize their shared data.
471          * This also initializes the elements of instrumentation->ps_instrument,
472          * if it exists.
473          */
474         d.pcxt = pcxt;
475         d.instrumentation = instrumentation;
476         d.nnodes = 0;
477         ExecParallelInitializeDSM(planstate, &d);
478
479         /*
480          * Make sure that the world hasn't shifted under our feat.  This could
481          * probably just be an Assert(), but let's be conservative for now.
482          */
483         if (e.nnodes != d.nnodes)
484                 elog(ERROR, "inconsistent count of PlanState nodes");
485
486         /* OK, we're ready to rock and roll. */
487         return pei;
488 }
489
490 /*
491  * Copy instrumentation information about this node and its descendents from
492  * dynamic shared memory.
493  */
494 static bool
495 ExecParallelRetrieveInstrumentation(PlanState *planstate,
496                                                           SharedExecutorInstrumentation *instrumentation)
497 {
498         Instrumentation *instrument;
499         int                     i;
500         int                     n;
501         int                     ibytes;
502         int                     plan_node_id = planstate->plan->plan_node_id;
503
504         /* Find the instumentation for this node. */
505         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
506                 if (instrumentation->plan_node_id[i] == plan_node_id)
507                         break;
508         if (i >= instrumentation->num_plan_nodes)
509                 elog(ERROR, "plan node %d not found", plan_node_id);
510
511         /* Accumulate the statistics from all workers. */
512         instrument = GetInstrumentationArray(instrumentation);
513         instrument += i * instrumentation->num_workers;
514         for (n = 0; n < instrumentation->num_workers; ++n)
515                 InstrAggNode(planstate->instrument, &instrument[n]);
516
517         /* Also store the per-worker detail. */
518         ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
519         planstate->worker_instrument =
520                 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
521         planstate->worker_instrument->num_workers = instrumentation->num_workers;
522         memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
523
524         return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
525                                                                  instrumentation);
526 }
527
528 /*
529  * Finish parallel execution.  We wait for parallel workers to finish, and
530  * accumulate their buffer usage and instrumentation.
531  */
532 void
533 ExecParallelFinish(ParallelExecutorInfo *pei)
534 {
535         int                     i;
536
537         if (pei->finished)
538                 return;
539
540         /* First, wait for the workers to finish. */
541         WaitForParallelWorkersToFinish(pei->pcxt);
542
543         /* Next, accumulate buffer usage. */
544         for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
545                 InstrAccumParallelQuery(&pei->buffer_usage[i]);
546
547         /* Finally, accumulate instrumentation, if any. */
548         if (pei->instrumentation)
549                 ExecParallelRetrieveInstrumentation(pei->planstate,
550                                                                                         pei->instrumentation);
551
552         pei->finished = true;
553 }
554
555 /*
556  * Clean up whatever ParallelExecutreInfo resources still exist after
557  * ExecParallelFinish.  We separate these routines because someone might
558  * want to examine the contents of the DSM after ExecParallelFinish and
559  * before calling this routine.
560  */
561 void
562 ExecParallelCleanup(ParallelExecutorInfo *pei)
563 {
564         if (pei->pcxt != NULL)
565         {
566                 DestroyParallelContext(pei->pcxt);
567                 pei->pcxt = NULL;
568         }
569         pfree(pei);
570 }
571
572 /*
573  * Create a DestReceiver to write tuples we produce to the shm_mq designated
574  * for that purpose.
575  */
576 static DestReceiver *
577 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
578 {
579         char       *mqspace;
580         shm_mq     *mq;
581
582         mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE);
583         mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
584         mq = (shm_mq *) mqspace;
585         shm_mq_set_sender(mq, MyProc);
586         return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
587 }
588
589 /*
590  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
591  */
592 static QueryDesc *
593 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
594                                                  int instrument_options)
595 {
596         char       *pstmtspace;
597         char       *paramspace;
598         PlannedStmt *pstmt;
599         ParamListInfo paramLI;
600
601         /* Reconstruct leader-supplied PlannedStmt. */
602         pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
603         pstmt = (PlannedStmt *) stringToNode(pstmtspace);
604
605         /* Reconstruct ParamListInfo. */
606         paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS);
607         paramLI = RestoreParamList(&paramspace);
608
609         /*
610          * Create a QueryDesc for the query.
611          *
612          * It's not obvious how to obtain the query string from here; and even if
613          * we could copying it would take more cycles than not copying it. But
614          * it's a bit unsatisfying to just use a dummy string here, so consider
615          * revising this someday.
616          */
617         return CreateQueryDesc(pstmt,
618                                                    "<parallel query>",
619                                                    GetActiveSnapshot(), InvalidSnapshot,
620                                                    receiver, paramLI, instrument_options);
621 }
622
623 /*
624  * Copy instrumentation information from this node and its descendents into
625  * dynamic shared memory, so that the parallel leader can retrieve it.
626  */
627 static bool
628 ExecParallelReportInstrumentation(PlanState *planstate,
629                                                           SharedExecutorInstrumentation *instrumentation)
630 {
631         int                     i;
632         int                     plan_node_id = planstate->plan->plan_node_id;
633         Instrumentation *instrument;
634
635         InstrEndLoop(planstate->instrument);
636
637         /*
638          * If we shuffled the plan_node_id values in ps_instrument into sorted
639          * order, we could use binary search here.  This might matter someday if
640          * we're pushing down sufficiently large plan trees.  For now, do it the
641          * slow, dumb way.
642          */
643         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
644                 if (instrumentation->plan_node_id[i] == plan_node_id)
645                         break;
646         if (i >= instrumentation->num_plan_nodes)
647                 elog(ERROR, "plan node %d not found", plan_node_id);
648
649         /*
650          * Add our statistics to the per-node, per-worker totals.  It's possible
651          * that this could happen more than once if we relaunched workers.
652          */
653         instrument = GetInstrumentationArray(instrumentation);
654         instrument += i * instrumentation->num_workers;
655         Assert(IsParallelWorker());
656         Assert(ParallelWorkerNumber < instrumentation->num_workers);
657         InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
658
659         return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
660                                                                  instrumentation);
661 }
662
663 /*
664  * Initialize the PlanState and its descendents with the information
665  * retrieved from shared memory.  This has to be done once the PlanState
666  * is allocated and initialized by executor; that is, after ExecutorStart().
667  */
668 static bool
669 ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
670 {
671         if (planstate == NULL)
672                 return false;
673
674         /* Call initializers for parallel-aware plan nodes. */
675         if (planstate->plan->parallel_aware)
676         {
677                 switch (nodeTag(planstate))
678                 {
679                         case T_SeqScanState:
680                                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
681                                 break;
682                         case T_ForeignScanState:
683                                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
684                                                                                                 toc);
685                                 break;
686                         case T_CustomScanState:
687                                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
688                                                                                            toc);
689                                 break;
690                         default:
691                                 break;
692                 }
693         }
694
695         return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
696 }
697
698 /*
699  * Main entrypoint for parallel query worker processes.
700  *
701  * We reach this function from ParallelMain, so the setup necessary to create
702  * a sensible parallel environment has already been done; ParallelMain worries
703  * about stuff like the transaction state, combo CID mappings, and GUC values,
704  * so we don't need to deal with any of that here.
705  *
706  * Our job is to deal with concerns specific to the executor.  The parallel
707  * group leader will have stored a serialized PlannedStmt, and it's our job
708  * to execute that plan and write the resulting tuples to the appropriate
709  * tuple queue.  Various bits of supporting information that we need in order
710  * to do this are also stored in the dsm_segment and can be accessed through
711  * the shm_toc.
712  */
713 static void
714 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
715 {
716         BufferUsage *buffer_usage;
717         DestReceiver *receiver;
718         QueryDesc  *queryDesc;
719         SharedExecutorInstrumentation *instrumentation;
720         int                     instrument_options = 0;
721
722         /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
723         receiver = ExecParallelGetReceiver(seg, toc);
724         instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION);
725         if (instrumentation != NULL)
726                 instrument_options = instrumentation->instrument_options;
727         queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
728
729         /* Prepare to track buffer usage during query execution. */
730         InstrStartParallelQuery();
731
732         /* Start up the executor, have it run the plan, and then shut it down. */
733         ExecutorStart(queryDesc, 0);
734         ExecParallelInitializeWorker(queryDesc->planstate, toc);
735         ExecutorRun(queryDesc, ForwardScanDirection, 0L);
736         ExecutorFinish(queryDesc);
737
738         /* Report buffer usage during parallel execution. */
739         buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
740         InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
741
742         /* Report instrumentation data if any instrumentation options are set. */
743         if (instrumentation != NULL)
744                 ExecParallelReportInstrumentation(queryDesc->planstate,
745                                                                                   instrumentation);
746
747         /* Must do this after capturing instrumentation. */
748         ExecutorEnd(queryDesc);
749
750         /* Cleanup. */
751         FreeQueryDesc(queryDesc);
752         (*receiver->rDestroy) (receiver);
753 }