]> granicus.if.org Git - postgresql/blob - src/backend/executor/execParallel.c
Faster expression evaluation and targetlist projection.
[postgresql] / src / backend / executor / execParallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * execParallel.c
4  *        Support routines for parallel execution.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * This file contains routines that are intended to support setting up,
10  * using, and tearing down a ParallelContext from within the PostgreSQL
11  * executor.  The ParallelContext machinery will handle starting the
12  * workers and ensuring that their state generally matches that of the
13  * leader; see src/backend/access/transam/README.parallel for details.
14  * However, we must save and restore relevant executor state, such as
15  * any ParamListInfo associated with the query, buffer usage info, and
16  * the actual plan to be passed down to the worker.
17  *
18  * IDENTIFICATION
19  *        src/backend/executor/execParallel.c
20  *
21  *-------------------------------------------------------------------------
22  */
23
24 #include "postgres.h"
25
26 #include "executor/execParallel.h"
27 #include "executor/executor.h"
28 #include "executor/nodeBitmapHeapscan.h"
29 #include "executor/nodeCustom.h"
30 #include "executor/nodeForeignscan.h"
31 #include "executor/nodeSeqscan.h"
32 #include "executor/nodeIndexscan.h"
33 #include "executor/nodeIndexonlyscan.h"
34 #include "executor/tqueue.h"
35 #include "nodes/nodeFuncs.h"
36 #include "optimizer/planmain.h"
37 #include "optimizer/planner.h"
38 #include "storage/spin.h"
39 #include "tcop/tcopprot.h"
40 #include "utils/dsa.h"
41 #include "utils/memutils.h"
42 #include "utils/snapmgr.h"
43 #include "pgstat.h"
44
45 /*
46  * Magic numbers for parallel executor communication.  We use constants
47  * greater than any 32-bit integer here so that values < 2^32 can be used
48  * by individual parallel nodes to store their own state.
49  */
50 #define PARALLEL_KEY_PLANNEDSTMT                UINT64CONST(0xE000000000000001)
51 #define PARALLEL_KEY_PARAMS                             UINT64CONST(0xE000000000000002)
52 #define PARALLEL_KEY_BUFFER_USAGE               UINT64CONST(0xE000000000000003)
53 #define PARALLEL_KEY_TUPLE_QUEUE                UINT64CONST(0xE000000000000004)
54 #define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000005)
55 #define PARALLEL_KEY_DSA                                UINT64CONST(0xE000000000000006)
56 #define PARALLEL_KEY_QUERY_TEXT         UINT64CONST(0xE000000000000007)
57
58 #define PARALLEL_TUPLE_QUEUE_SIZE               65536
59
60 /*
61  * DSM structure for accumulating per-PlanState instrumentation.
62  *
63  * instrument_options: Same meaning here as in instrument.c.
64  *
65  * instrument_offset: Offset, relative to the start of this structure,
66  * of the first Instrumentation object.  This will depend on the length of
67  * the plan_node_id array.
68  *
69  * num_workers: Number of workers.
70  *
71  * num_plan_nodes: Number of plan nodes.
72  *
73  * plan_node_id: Array of plan nodes for which we are gathering instrumentation
74  * from parallel workers.  The length of this array is given by num_plan_nodes.
75  */
76 struct SharedExecutorInstrumentation
77 {
78         int                     instrument_options;
79         int                     instrument_offset;
80         int                     num_workers;
81         int                     num_plan_nodes;
82         int                     plan_node_id[FLEXIBLE_ARRAY_MEMBER];
83         /* array of num_plan_nodes * num_workers Instrumentation objects follows */
84 };
85 #define GetInstrumentationArray(sei) \
86         (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
87          (Instrumentation *) (((char *) sei) + sei->instrument_offset))
88
89 /* Context object for ExecParallelEstimate. */
90 typedef struct ExecParallelEstimateContext
91 {
92         ParallelContext *pcxt;
93         int                     nnodes;
94 } ExecParallelEstimateContext;
95
96 /* Context object for ExecParallelInitializeDSM. */
97 typedef struct ExecParallelInitializeDSMContext
98 {
99         ParallelContext *pcxt;
100         SharedExecutorInstrumentation *instrumentation;
101         int                     nnodes;
102 } ExecParallelInitializeDSMContext;
103
104 /* Helper functions that run in the parallel leader. */
105 static char *ExecSerializePlan(Plan *plan, EState *estate);
106 static bool ExecParallelEstimate(PlanState *node,
107                                          ExecParallelEstimateContext *e);
108 static bool ExecParallelInitializeDSM(PlanState *node,
109                                                   ExecParallelInitializeDSMContext *d);
110 static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
111                                                          bool reinitialize);
112 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
113                                                          SharedExecutorInstrumentation *instrumentation);
114
115 /* Helper functions that run in the parallel worker. */
116 static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
117 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
118
119 /*
120  * Create a serialized representation of the plan to be sent to each worker.
121  */
122 static char *
123 ExecSerializePlan(Plan *plan, EState *estate)
124 {
125         PlannedStmt *pstmt;
126         ListCell   *tlist;
127
128         /* We can't scribble on the original plan, so make a copy. */
129         plan = copyObject(plan);
130
131         /*
132          * The worker will start its own copy of the executor, and that copy will
133          * insert a junk filter if the toplevel node has any resjunk entries. We
134          * don't want that to happen, because while resjunk columns shouldn't be
135          * sent back to the user, here the tuples are coming back to another
136          * backend which may very well need them.  So mutate the target list
137          * accordingly.  This is sort of a hack; there might be better ways to do
138          * this...
139          */
140         foreach(tlist, plan->targetlist)
141         {
142                 TargetEntry *tle = (TargetEntry *) lfirst(tlist);
143
144                 tle->resjunk = false;
145         }
146
147         /*
148          * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
149          * for our purposes, but the worker will need at least a minimal
150          * PlannedStmt to start the executor.
151          */
152         pstmt = makeNode(PlannedStmt);
153         pstmt->commandType = CMD_SELECT;
154         pstmt->queryId = 0;
155         pstmt->hasReturning = false;
156         pstmt->hasModifyingCTE = false;
157         pstmt->canSetTag = true;
158         pstmt->transientPlan = false;
159         pstmt->dependsOnRole = false;
160         pstmt->parallelModeNeeded = false;
161         pstmt->planTree = plan;
162         pstmt->rtable = estate->es_range_table;
163         pstmt->resultRelations = NIL;
164         pstmt->nonleafResultRelations = NIL;
165         pstmt->subplans = estate->es_plannedstmt->subplans;
166         pstmt->rewindPlanIDs = NULL;
167         pstmt->rowMarks = NIL;
168         pstmt->relationOids = NIL;
169         pstmt->invalItems = NIL;        /* workers can't replan anyway... */
170         pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
171         pstmt->utilityStmt = NULL;
172         pstmt->stmt_location = -1;
173         pstmt->stmt_len = -1;
174
175         /* Return serialized copy of our dummy PlannedStmt. */
176         return nodeToString(pstmt);
177 }
178
179 /*
180  * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
181  * may need some state which is shared across all parallel workers.  Before
182  * we size the DSM, give them a chance to call shm_toc_estimate_chunk or
183  * shm_toc_estimate_keys on &pcxt->estimator.
184  *
185  * While we're at it, count the number of PlanState nodes in the tree, so
186  * we know how many SharedPlanStateInstrumentation structures we need.
187  */
188 static bool
189 ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
190 {
191         if (planstate == NULL)
192                 return false;
193
194         /* Count this node. */
195         e->nnodes++;
196
197         /* Call estimators for parallel-aware nodes. */
198         if (planstate->plan->parallel_aware)
199         {
200                 switch (nodeTag(planstate))
201                 {
202                         case T_SeqScanState:
203                                 ExecSeqScanEstimate((SeqScanState *) planstate,
204                                                                         e->pcxt);
205                                 break;
206                         case T_IndexScanState:
207                                 ExecIndexScanEstimate((IndexScanState *) planstate,
208                                                                           e->pcxt);
209                                 break;
210                         case T_IndexOnlyScanState:
211                                 ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
212                                                                                   e->pcxt);
213                                 break;
214                         case T_ForeignScanState:
215                                 ExecForeignScanEstimate((ForeignScanState *) planstate,
216                                                                                 e->pcxt);
217                                 break;
218                         case T_CustomScanState:
219                                 ExecCustomScanEstimate((CustomScanState *) planstate,
220                                                                            e->pcxt);
221                                 break;
222                         case T_BitmapHeapScanState:
223                                 ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
224                                                                            e->pcxt);
225                                 break;
226                         default:
227                                 break;
228                 }
229         }
230
231         return planstate_tree_walker(planstate, ExecParallelEstimate, e);
232 }
233
234 /*
235  * Initialize the dynamic shared memory segment that will be used to control
236  * parallel execution.
237  */
238 static bool
239 ExecParallelInitializeDSM(PlanState *planstate,
240                                                   ExecParallelInitializeDSMContext *d)
241 {
242         if (planstate == NULL)
243                 return false;
244
245         /* If instrumentation is enabled, initialize slot for this node. */
246         if (d->instrumentation != NULL)
247                 d->instrumentation->plan_node_id[d->nnodes] =
248                         planstate->plan->plan_node_id;
249
250         /* Count this node. */
251         d->nnodes++;
252
253         /*
254          * Call initializers for parallel-aware plan nodes.
255          *
256          * Ordinary plan nodes won't do anything here, but parallel-aware plan
257          * nodes may need to initialize shared state in the DSM before parallel
258          * workers are available.  They can allocate the space they previously
259          * estimated using shm_toc_allocate, and add the keys they previously
260          * estimated using shm_toc_insert, in each case targeting pcxt->toc.
261          */
262         if (planstate->plan->parallel_aware)
263         {
264                 switch (nodeTag(planstate))
265                 {
266                         case T_SeqScanState:
267                                 ExecSeqScanInitializeDSM((SeqScanState *) planstate,
268                                                                                  d->pcxt);
269                                 break;
270                         case T_IndexScanState:
271                                 ExecIndexScanInitializeDSM((IndexScanState *) planstate,
272                                                                                    d->pcxt);
273                                 break;
274                         case T_IndexOnlyScanState:
275                                 ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
276                                                                                            d->pcxt);
277                                 break;
278                         case T_ForeignScanState:
279                                 ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
280                                                                                          d->pcxt);
281                                 break;
282                         case T_CustomScanState:
283                                 ExecCustomScanInitializeDSM((CustomScanState *) planstate,
284                                                                                         d->pcxt);
285                                 break;
286                         case T_BitmapHeapScanState:
287                                 ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
288                                                                                         d->pcxt);
289                                 break;
290
291                         default:
292                                 break;
293                 }
294         }
295
296         return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
297 }
298
299 /*
300  * It sets up the response queues for backend workers to return tuples
301  * to the main backend and start the workers.
302  */
303 static shm_mq_handle **
304 ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
305 {
306         shm_mq_handle **responseq;
307         char       *tqueuespace;
308         int                     i;
309
310         /* Skip this if no workers. */
311         if (pcxt->nworkers == 0)
312                 return NULL;
313
314         /* Allocate memory for shared memory queue handles. */
315         responseq = (shm_mq_handle **)
316                 palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
317
318         /*
319          * If not reinitializing, allocate space from the DSM for the queues;
320          * otherwise, find the already allocated space.
321          */
322         if (!reinitialize)
323                 tqueuespace =
324                         shm_toc_allocate(pcxt->toc,
325                                                          mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
326                                                                           pcxt->nworkers));
327         else
328                 tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
329
330         /* Create the queues, and become the receiver for each. */
331         for (i = 0; i < pcxt->nworkers; ++i)
332         {
333                 shm_mq     *mq;
334
335                 mq = shm_mq_create(tqueuespace +
336                                                    ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
337                                                    (Size) PARALLEL_TUPLE_QUEUE_SIZE);
338
339                 shm_mq_set_receiver(mq, MyProc);
340                 responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
341         }
342
343         /* Add array of queues to shm_toc, so others can find it. */
344         if (!reinitialize)
345                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
346
347         /* Return array of handles. */
348         return responseq;
349 }
350
351 /*
352  * Re-initialize the parallel executor info such that it can be reused by
353  * workers.
354  */
355 void
356 ExecParallelReinitialize(ParallelExecutorInfo *pei)
357 {
358         ReinitializeParallelDSM(pei->pcxt);
359         pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
360         pei->finished = false;
361 }
362
363 /*
364  * Sets up the required infrastructure for backend workers to perform
365  * execution and return results to the main backend.
366  */
367 ParallelExecutorInfo *
368 ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
369 {
370         ParallelExecutorInfo *pei;
371         ParallelContext *pcxt;
372         ExecParallelEstimateContext e;
373         ExecParallelInitializeDSMContext d;
374         char       *pstmt_data;
375         char       *pstmt_space;
376         char       *param_space;
377         BufferUsage *bufusage_space;
378         SharedExecutorInstrumentation *instrumentation = NULL;
379         int                     pstmt_len;
380         int                     param_len;
381         int                     instrumentation_len = 0;
382         int                     instrument_offset = 0;
383         Size            dsa_minsize = dsa_minimum_size();
384         char       *query_string;
385         int                     query_len;
386
387         /* Allocate object for return value. */
388         pei = palloc0(sizeof(ParallelExecutorInfo));
389         pei->finished = false;
390         pei->planstate = planstate;
391
392         /* Fix up and serialize plan to be sent to workers. */
393         pstmt_data = ExecSerializePlan(planstate->plan, estate);
394
395         /* Create a parallel context. */
396         pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
397         pei->pcxt = pcxt;
398
399         /*
400          * Before telling the parallel context to create a dynamic shared memory
401          * segment, we need to figure out how big it should be.  Estimate space
402          * for the various things we need to store.
403          */
404
405         /* Estimate space for query text. */
406         query_len = strlen(estate->es_sourceText);
407         shm_toc_estimate_chunk(&pcxt->estimator, query_len);
408         shm_toc_estimate_keys(&pcxt->estimator, 1);
409
410         /* Estimate space for serialized PlannedStmt. */
411         pstmt_len = strlen(pstmt_data) + 1;
412         shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
413         shm_toc_estimate_keys(&pcxt->estimator, 1);
414
415         /* Estimate space for serialized ParamListInfo. */
416         param_len = EstimateParamListSpace(estate->es_param_list_info);
417         shm_toc_estimate_chunk(&pcxt->estimator, param_len);
418         shm_toc_estimate_keys(&pcxt->estimator, 1);
419
420         /*
421          * Estimate space for BufferUsage.
422          *
423          * If EXPLAIN is not in use and there are no extensions loaded that care,
424          * we could skip this.  But we have no way of knowing whether anyone's
425          * looking at pgBufferUsage, so do it unconditionally.
426          */
427         shm_toc_estimate_chunk(&pcxt->estimator,
428                                                    mul_size(sizeof(BufferUsage), pcxt->nworkers));
429         shm_toc_estimate_keys(&pcxt->estimator, 1);
430
431         /* Estimate space for tuple queues. */
432         shm_toc_estimate_chunk(&pcxt->estimator,
433                                                 mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
434         shm_toc_estimate_keys(&pcxt->estimator, 1);
435
436         /*
437          * Give parallel-aware nodes a chance to add to the estimates, and get a
438          * count of how many PlanState nodes there are.
439          */
440         e.pcxt = pcxt;
441         e.nnodes = 0;
442         ExecParallelEstimate(planstate, &e);
443
444         /* Estimate space for instrumentation, if required. */
445         if (estate->es_instrument)
446         {
447                 instrumentation_len =
448                         offsetof(SharedExecutorInstrumentation, plan_node_id) +
449                         sizeof(int) * e.nnodes;
450                 instrumentation_len = MAXALIGN(instrumentation_len);
451                 instrument_offset = instrumentation_len;
452                 instrumentation_len +=
453                         mul_size(sizeof(Instrumentation),
454                                          mul_size(e.nnodes, nworkers));
455                 shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
456                 shm_toc_estimate_keys(&pcxt->estimator, 1);
457         }
458
459         /* Estimate space for DSA area. */
460         shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
461         shm_toc_estimate_keys(&pcxt->estimator, 1);
462
463         /* Everyone's had a chance to ask for space, so now create the DSM. */
464         InitializeParallelDSM(pcxt);
465
466         /*
467          * OK, now we have a dynamic shared memory segment, and it should be big
468          * enough to store all of the data we estimated we would want to put into
469          * it, plus whatever general stuff (not specifically executor-related) the
470          * ParallelContext itself needs to store there.  None of the space we
471          * asked for has been allocated or initialized yet, though, so do that.
472          */
473
474         /* Store query string */
475         query_string = shm_toc_allocate(pcxt->toc, query_len);
476         memcpy(query_string, estate->es_sourceText, query_len);
477         shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
478
479         /* Store serialized PlannedStmt. */
480         pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
481         memcpy(pstmt_space, pstmt_data, pstmt_len);
482         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
483
484         /* Store serialized ParamListInfo. */
485         param_space = shm_toc_allocate(pcxt->toc, param_len);
486         shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
487         SerializeParamList(estate->es_param_list_info, &param_space);
488
489         /* Allocate space for each worker's BufferUsage; no need to initialize. */
490         bufusage_space = shm_toc_allocate(pcxt->toc,
491                                                           mul_size(sizeof(BufferUsage), pcxt->nworkers));
492         shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
493         pei->buffer_usage = bufusage_space;
494
495         /* Set up tuple queues. */
496         pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
497
498         /*
499          * If instrumentation options were supplied, allocate space for the data.
500          * It only gets partially initialized here; the rest happens during
501          * ExecParallelInitializeDSM.
502          */
503         if (estate->es_instrument)
504         {
505                 Instrumentation *instrument;
506                 int                     i;
507
508                 instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
509                 instrumentation->instrument_options = estate->es_instrument;
510                 instrumentation->instrument_offset = instrument_offset;
511                 instrumentation->num_workers = nworkers;
512                 instrumentation->num_plan_nodes = e.nnodes;
513                 instrument = GetInstrumentationArray(instrumentation);
514                 for (i = 0; i < nworkers * e.nnodes; ++i)
515                         InstrInit(&instrument[i], estate->es_instrument);
516                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
517                                            instrumentation);
518                 pei->instrumentation = instrumentation;
519         }
520
521         /*
522          * Create a DSA area that can be used by the leader and all workers.
523          * (However, if we failed to create a DSM and are using private memory
524          * instead, then skip this.)
525          */
526         if (pcxt->seg != NULL)
527         {
528                 char       *area_space;
529
530                 area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
531                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
532                 pei->area = dsa_create_in_place(area_space, dsa_minsize,
533                                                                                 LWTRANCHE_PARALLEL_QUERY_DSA,
534                                                                                 pcxt->seg);
535         }
536
537         /*
538          * Make the area available to executor nodes running in the leader.  See
539          * also ParallelQueryMain which makes it available to workers.
540          */
541         estate->es_query_dsa = pei->area;
542
543         /*
544          * Give parallel-aware nodes a chance to initialize their shared data.
545          * This also initializes the elements of instrumentation->ps_instrument,
546          * if it exists.
547          */
548         d.pcxt = pcxt;
549         d.instrumentation = instrumentation;
550         d.nnodes = 0;
551         ExecParallelInitializeDSM(planstate, &d);
552
553         /*
554          * Make sure that the world hasn't shifted under our feat.  This could
555          * probably just be an Assert(), but let's be conservative for now.
556          */
557         if (e.nnodes != d.nnodes)
558                 elog(ERROR, "inconsistent count of PlanState nodes");
559
560         /* OK, we're ready to rock and roll. */
561         return pei;
562 }
563
564 /*
565  * Copy instrumentation information about this node and its descendants from
566  * dynamic shared memory.
567  */
568 static bool
569 ExecParallelRetrieveInstrumentation(PlanState *planstate,
570                                                           SharedExecutorInstrumentation *instrumentation)
571 {
572         Instrumentation *instrument;
573         int                     i;
574         int                     n;
575         int                     ibytes;
576         int                     plan_node_id = planstate->plan->plan_node_id;
577         MemoryContext oldcontext;
578
579         /* Find the instrumentation for this node. */
580         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
581                 if (instrumentation->plan_node_id[i] == plan_node_id)
582                         break;
583         if (i >= instrumentation->num_plan_nodes)
584                 elog(ERROR, "plan node %d not found", plan_node_id);
585
586         /* Accumulate the statistics from all workers. */
587         instrument = GetInstrumentationArray(instrumentation);
588         instrument += i * instrumentation->num_workers;
589         for (n = 0; n < instrumentation->num_workers; ++n)
590                 InstrAggNode(planstate->instrument, &instrument[n]);
591
592         /*
593          * Also store the per-worker detail.
594          *
595          * Worker instrumentation should be allocated in the same context as
596          * the regular instrumentation information, which is the per-query
597          * context. Switch into per-query memory context.
598          */
599         oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
600         ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
601         planstate->worker_instrument =
602                 palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
603         MemoryContextSwitchTo(oldcontext);
604
605         planstate->worker_instrument->num_workers = instrumentation->num_workers;
606         memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
607
608         return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
609                                                                  instrumentation);
610 }
611
612 /*
613  * Finish parallel execution.  We wait for parallel workers to finish, and
614  * accumulate their buffer usage and instrumentation.
615  */
616 void
617 ExecParallelFinish(ParallelExecutorInfo *pei)
618 {
619         int                     i;
620
621         if (pei->finished)
622                 return;
623
624         /* First, wait for the workers to finish. */
625         WaitForParallelWorkersToFinish(pei->pcxt);
626
627         /* Next, accumulate buffer usage. */
628         for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
629                 InstrAccumParallelQuery(&pei->buffer_usage[i]);
630
631         /* Finally, accumulate instrumentation, if any. */
632         if (pei->instrumentation)
633                 ExecParallelRetrieveInstrumentation(pei->planstate,
634                                                                                         pei->instrumentation);
635
636         pei->finished = true;
637 }
638
639 /*
640  * Clean up whatever ParallelExecutorInfo resources still exist after
641  * ExecParallelFinish.  We separate these routines because someone might
642  * want to examine the contents of the DSM after ExecParallelFinish and
643  * before calling this routine.
644  */
645 void
646 ExecParallelCleanup(ParallelExecutorInfo *pei)
647 {
648         if (pei->area != NULL)
649         {
650                 dsa_detach(pei->area);
651                 pei->area = NULL;
652         }
653         if (pei->pcxt != NULL)
654         {
655                 DestroyParallelContext(pei->pcxt);
656                 pei->pcxt = NULL;
657         }
658         pfree(pei);
659 }
660
661 /*
662  * Create a DestReceiver to write tuples we produce to the shm_mq designated
663  * for that purpose.
664  */
665 static DestReceiver *
666 ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
667 {
668         char       *mqspace;
669         shm_mq     *mq;
670
671         mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE);
672         mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
673         mq = (shm_mq *) mqspace;
674         shm_mq_set_sender(mq, MyProc);
675         return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
676 }
677
678 /*
679  * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
680  */
681 static QueryDesc *
682 ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
683                                                  int instrument_options)
684 {
685         char       *pstmtspace;
686         char       *paramspace;
687         PlannedStmt *pstmt;
688         ParamListInfo paramLI;
689         char       *queryString;
690
691         /* Get the query string from shared memory */
692         queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT);
693
694         /* Reconstruct leader-supplied PlannedStmt. */
695         pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
696         pstmt = (PlannedStmt *) stringToNode(pstmtspace);
697
698         /* Reconstruct ParamListInfo. */
699         paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS);
700         paramLI = RestoreParamList(&paramspace);
701
702         /*
703          * Create a QueryDesc for the query.
704          *
705          * It's not obvious how to obtain the query string from here; and even if
706          * we could copying it would take more cycles than not copying it. But
707          * it's a bit unsatisfying to just use a dummy string here, so consider
708          * revising this someday.
709          */
710         return CreateQueryDesc(pstmt,
711                                                    queryString,
712                                                    GetActiveSnapshot(), InvalidSnapshot,
713                                                    receiver, paramLI, instrument_options);
714 }
715
716 /*
717  * Copy instrumentation information from this node and its descendants into
718  * dynamic shared memory, so that the parallel leader can retrieve it.
719  */
720 static bool
721 ExecParallelReportInstrumentation(PlanState *planstate,
722                                                           SharedExecutorInstrumentation *instrumentation)
723 {
724         int                     i;
725         int                     plan_node_id = planstate->plan->plan_node_id;
726         Instrumentation *instrument;
727
728         InstrEndLoop(planstate->instrument);
729
730         /*
731          * If we shuffled the plan_node_id values in ps_instrument into sorted
732          * order, we could use binary search here.  This might matter someday if
733          * we're pushing down sufficiently large plan trees.  For now, do it the
734          * slow, dumb way.
735          */
736         for (i = 0; i < instrumentation->num_plan_nodes; ++i)
737                 if (instrumentation->plan_node_id[i] == plan_node_id)
738                         break;
739         if (i >= instrumentation->num_plan_nodes)
740                 elog(ERROR, "plan node %d not found", plan_node_id);
741
742         /*
743          * Add our statistics to the per-node, per-worker totals.  It's possible
744          * that this could happen more than once if we relaunched workers.
745          */
746         instrument = GetInstrumentationArray(instrumentation);
747         instrument += i * instrumentation->num_workers;
748         Assert(IsParallelWorker());
749         Assert(ParallelWorkerNumber < instrumentation->num_workers);
750         InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
751
752         return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
753                                                                  instrumentation);
754 }
755
756 /*
757  * Initialize the PlanState and its descendants with the information
758  * retrieved from shared memory.  This has to be done once the PlanState
759  * is allocated and initialized by executor; that is, after ExecutorStart().
760  */
761 static bool
762 ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
763 {
764         if (planstate == NULL)
765                 return false;
766
767         /* Call initializers for parallel-aware plan nodes. */
768         if (planstate->plan->parallel_aware)
769         {
770                 switch (nodeTag(planstate))
771                 {
772                         case T_SeqScanState:
773                                 ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
774                                 break;
775                         case T_IndexScanState:
776                                 ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
777                                 break;
778                         case T_IndexOnlyScanState:
779                                 ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc);
780                                 break;
781                         case T_ForeignScanState:
782                                 ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
783                                                                                                 toc);
784                                 break;
785                         case T_CustomScanState:
786                                 ExecCustomScanInitializeWorker((CustomScanState *) planstate,
787                                                                                            toc);
788                                 break;
789                         case T_BitmapHeapScanState:
790                                 ExecBitmapHeapInitializeWorker(
791                                                                          (BitmapHeapScanState *) planstate, toc);
792                                 break;
793                         default:
794                                 break;
795                 }
796         }
797
798         return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
799 }
800
801 /*
802  * Main entrypoint for parallel query worker processes.
803  *
804  * We reach this function from ParallelWorkerMain, so the setup necessary to
805  * create a sensible parallel environment has already been done;
806  * ParallelWorkerMain worries about stuff like the transaction state, combo
807  * CID mappings, and GUC values, so we don't need to deal with any of that
808  * here.
809  *
810  * Our job is to deal with concerns specific to the executor.  The parallel
811  * group leader will have stored a serialized PlannedStmt, and it's our job
812  * to execute that plan and write the resulting tuples to the appropriate
813  * tuple queue.  Various bits of supporting information that we need in order
814  * to do this are also stored in the dsm_segment and can be accessed through
815  * the shm_toc.
816  */
817 static void
818 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
819 {
820         BufferUsage *buffer_usage;
821         DestReceiver *receiver;
822         QueryDesc  *queryDesc;
823         SharedExecutorInstrumentation *instrumentation;
824         int                     instrument_options = 0;
825         void       *area_space;
826         dsa_area   *area;
827
828         /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
829         receiver = ExecParallelGetReceiver(seg, toc);
830         instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION);
831         if (instrumentation != NULL)
832                 instrument_options = instrumentation->instrument_options;
833         queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
834
835         /* Setting debug_query_string for individual workers */
836         debug_query_string = queryDesc->sourceText;
837
838         /* Report workers' query for monitoring purposes */
839         pgstat_report_activity(STATE_RUNNING, debug_query_string);
840
841         /* Prepare to track buffer usage during query execution. */
842         InstrStartParallelQuery();
843
844         /* Attach to the dynamic shared memory area. */
845         area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA);
846         area = dsa_attach_in_place(area_space, seg);
847
848         /* Start up the executor */
849         ExecutorStart(queryDesc, 0);
850
851         /* Special executor initialization steps for parallel workers */
852         queryDesc->planstate->state->es_query_dsa = area;
853         ExecParallelInitializeWorker(queryDesc->planstate, toc);
854
855         /* Run the plan */
856         ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
857
858         /* Shut down the executor */
859         ExecutorFinish(queryDesc);
860
861         /* Report buffer usage during parallel execution. */
862         buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
863         InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
864
865         /* Report instrumentation data if any instrumentation options are set. */
866         if (instrumentation != NULL)
867                 ExecParallelReportInstrumentation(queryDesc->planstate,
868                                                                                   instrumentation);
869
870         /* Must do this after capturing instrumentation. */
871         ExecutorEnd(queryDesc);
872
873         /* Cleanup. */
874         dsa_detach(area);
875         FreeQueryDesc(queryDesc);
876         (*receiver->rDestroy) (receiver);
877 }