]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeAppend.c
Support Parallel Append plan nodes.
[postgresql] / src / backend / executor / nodeAppend.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAppend.c
4  *        routines to handle append nodes.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *              ExecInitAppend  - initialize the append node
17  *              ExecAppend              - retrieve the next tuple from the node
18  *              ExecEndAppend   - shut down the append node
19  *              ExecReScanAppend - rescan the append node
20  *
21  *       NOTES
22  *              Each append node contains a list of one or more subplans which
23  *              must be iteratively processed (forwards or backwards).
24  *              Tuples are retrieved by executing the 'whichplan'th subplan
25  *              until the subplan stops returning tuples, at which point that
26  *              plan is shut down and the next started up.
27  *
28  *              Append nodes don't make use of their left and right
29  *              subtrees, rather they maintain a list of subplans so
30  *              a typical append node looks like this in the plan tree:
31  *
32  *                                 ...
33  *                                 /
34  *                              Append -------+------+------+--- nil
35  *                              /       \                 |              |              |
36  *                        nil   nil              ...    ...    ...
37  *                                                               subplans
38  *
39  *              Append nodes are currently used for unions, and to support
40  *              inheritance queries, where several relations need to be scanned.
41  *              For example, in our standard person/student/employee/student-emp
42  *              example, where student and employee inherit from person
43  *              and student-emp inherits from student and employee, the
44  *              query:
45  *
46  *                              select name from person
47  *
48  *              generates the plan:
49  *
50  *                                |
51  *                              Append -------+-------+--------+--------+
52  *                              /       \                 |               |                |            |
53  *                        nil   nil              Scan    Scan     Scan     Scan
54  *                                                        |               |                |            |
55  *                                                      person employee student student-emp
56  */
57
58 #include "postgres.h"
59
60 #include "executor/execdebug.h"
61 #include "executor/nodeAppend.h"
62 #include "miscadmin.h"
63
64 /* Shared state for parallel-aware Append. */
65 struct ParallelAppendState
66 {
67         LWLock          pa_lock;                /* mutual exclusion to choose next subplan */
68         int                     pa_next_plan;   /* next plan to choose by any worker */
69
70         /*
71          * pa_finished[i] should be true if no more workers should select subplan
72          * i.  for a non-partial plan, this should be set to true as soon as a
73          * worker selects the plan; for a partial plan, it remains false until
74          * some worker executes the plan to completion.
75          */
76         bool            pa_finished[FLEXIBLE_ARRAY_MEMBER];
77 };
78
79 #define INVALID_SUBPLAN_INDEX           -1
80
81 static TupleTableSlot *ExecAppend(PlanState *pstate);
82 static bool choose_next_subplan_locally(AppendState *node);
83 static bool choose_next_subplan_for_leader(AppendState *node);
84 static bool choose_next_subplan_for_worker(AppendState *node);
85
86 /* ----------------------------------------------------------------
87  *              ExecInitAppend
88  *
89  *              Begin all of the subscans of the append node.
90  *
91  *         (This is potentially wasteful, since the entire result of the
92  *              append node may not be scanned, but this way all of the
93  *              structures get allocated in the executor's top level memory
94  *              block instead of that of the call to ExecAppend.)
95  * ----------------------------------------------------------------
96  */
97 AppendState *
98 ExecInitAppend(Append *node, EState *estate, int eflags)
99 {
100         AppendState *appendstate = makeNode(AppendState);
101         PlanState **appendplanstates;
102         int                     nplans;
103         int                     i;
104         ListCell   *lc;
105
106         /* check for unsupported flags */
107         Assert(!(eflags & EXEC_FLAG_MARK));
108
109         /*
110          * Lock the non-leaf tables in the partition tree controlled by this node.
111          * It's a no-op for non-partitioned parent tables.
112          */
113         ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
114
115         /*
116          * Set up empty vector of subplan states
117          */
118         nplans = list_length(node->appendplans);
119
120         appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
121
122         /*
123          * create new AppendState for our append node
124          */
125         appendstate->ps.plan = (Plan *) node;
126         appendstate->ps.state = estate;
127         appendstate->ps.ExecProcNode = ExecAppend;
128         appendstate->appendplans = appendplanstates;
129         appendstate->as_nplans = nplans;
130
131         /*
132          * Miscellaneous initialization
133          *
134          * Append plans don't have expression contexts because they never call
135          * ExecQual or ExecProject.
136          */
137
138         /*
139          * append nodes still have Result slots, which hold pointers to tuples, so
140          * we have to initialize them.
141          */
142         ExecInitResultTupleSlot(estate, &appendstate->ps);
143
144         /*
145          * call ExecInitNode on each of the plans to be executed and save the
146          * results into the array "appendplans".
147          */
148         i = 0;
149         foreach(lc, node->appendplans)
150         {
151                 Plan       *initNode = (Plan *) lfirst(lc);
152
153                 appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
154                 i++;
155         }
156
157         /*
158          * initialize output tuple type
159          */
160         ExecAssignResultTypeFromTL(&appendstate->ps);
161         appendstate->ps.ps_ProjInfo = NULL;
162
163         /*
164          * Parallel-aware append plans must choose the first subplan to execute by
165          * looking at shared memory, but non-parallel-aware append plans can
166          * always start with the first subplan.
167          */
168         appendstate->as_whichplan =
169                 appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
170
171         /* If parallel-aware, this will be overridden later. */
172         appendstate->choose_next_subplan = choose_next_subplan_locally;
173
174         return appendstate;
175 }
176
177 /* ----------------------------------------------------------------
178  *         ExecAppend
179  *
180  *              Handles iteration over multiple subplans.
181  * ----------------------------------------------------------------
182  */
183 static TupleTableSlot *
184 ExecAppend(PlanState *pstate)
185 {
186         AppendState *node = castNode(AppendState, pstate);
187
188         /* If no subplan has been chosen, we must choose one before proceeding. */
189         if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
190                 !node->choose_next_subplan(node))
191                 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
192
193         for (;;)
194         {
195                 PlanState  *subnode;
196                 TupleTableSlot *result;
197
198                 CHECK_FOR_INTERRUPTS();
199
200                 /*
201                  * figure out which subplan we are currently processing
202                  */
203                 Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
204                 subnode = node->appendplans[node->as_whichplan];
205
206                 /*
207                  * get a tuple from the subplan
208                  */
209                 result = ExecProcNode(subnode);
210
211                 if (!TupIsNull(result))
212                 {
213                         /*
214                          * If the subplan gave us something then return it as-is. We do
215                          * NOT make use of the result slot that was set up in
216                          * ExecInitAppend; there's no need for it.
217                          */
218                         return result;
219                 }
220
221                 /* choose new subplan; if none, we're done */
222                 if (!node->choose_next_subplan(node))
223                         return ExecClearTuple(node->ps.ps_ResultTupleSlot);
224         }
225 }
226
227 /* ----------------------------------------------------------------
228  *              ExecEndAppend
229  *
230  *              Shuts down the subscans of the append node.
231  *
232  *              Returns nothing of interest.
233  * ----------------------------------------------------------------
234  */
235 void
236 ExecEndAppend(AppendState *node)
237 {
238         PlanState **appendplans;
239         int                     nplans;
240         int                     i;
241
242         /*
243          * get information from the node
244          */
245         appendplans = node->appendplans;
246         nplans = node->as_nplans;
247
248         /*
249          * shut down each of the subscans
250          */
251         for (i = 0; i < nplans; i++)
252                 ExecEndNode(appendplans[i]);
253 }
254
255 void
256 ExecReScanAppend(AppendState *node)
257 {
258         int                     i;
259
260         for (i = 0; i < node->as_nplans; i++)
261         {
262                 PlanState  *subnode = node->appendplans[i];
263
264                 /*
265                  * ExecReScan doesn't know about my subplans, so I have to do
266                  * changed-parameter signaling myself.
267                  */
268                 if (node->ps.chgParam != NULL)
269                         UpdateChangedParamSet(subnode, node->ps.chgParam);
270
271                 /*
272                  * If chgParam of subnode is not null then plan will be re-scanned by
273                  * first ExecProcNode.
274                  */
275                 if (subnode->chgParam == NULL)
276                         ExecReScan(subnode);
277         }
278
279         node->as_whichplan =
280                 node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
281 }
282
283 /* ----------------------------------------------------------------
284  *                                              Parallel Append Support
285  * ----------------------------------------------------------------
286  */
287
288 /* ----------------------------------------------------------------
289  *              ExecAppendEstimate
290  *
291  *              Compute the amount of space we'll need in the parallel
292  *              query DSM, and inform pcxt->estimator about our needs.
293  * ----------------------------------------------------------------
294  */
295 void
296 ExecAppendEstimate(AppendState *node,
297                                    ParallelContext *pcxt)
298 {
299         node->pstate_len =
300                 add_size(offsetof(ParallelAppendState, pa_finished),
301                                  sizeof(bool) * node->as_nplans);
302
303         shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
304         shm_toc_estimate_keys(&pcxt->estimator, 1);
305 }
306
307
308 /* ----------------------------------------------------------------
309  *              ExecAppendInitializeDSM
310  *
311  *              Set up shared state for Parallel Append.
312  * ----------------------------------------------------------------
313  */
314 void
315 ExecAppendInitializeDSM(AppendState *node,
316                                                 ParallelContext *pcxt)
317 {
318         ParallelAppendState *pstate;
319
320         pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
321         memset(pstate, 0, node->pstate_len);
322         LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
323         shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
324
325         node->as_pstate = pstate;
326         node->choose_next_subplan = choose_next_subplan_for_leader;
327 }
328
329 /* ----------------------------------------------------------------
330  *              ExecAppendReInitializeDSM
331  *
332  *              Reset shared state before beginning a fresh scan.
333  * ----------------------------------------------------------------
334  */
335 void
336 ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
337 {
338         ParallelAppendState *pstate = node->as_pstate;
339
340         pstate->pa_next_plan = 0;
341         memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
342 }
343
344 /* ----------------------------------------------------------------
345  *              ExecAppendInitializeWorker
346  *
347  *              Copy relevant information from TOC into planstate, and initialize
348  *              whatever is required to choose and execute the optimal subplan.
349  * ----------------------------------------------------------------
350  */
351 void
352 ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
353 {
354         node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
355         node->choose_next_subplan = choose_next_subplan_for_worker;
356 }
357
358 /* ----------------------------------------------------------------
359  *              choose_next_subplan_locally
360  *
361  *              Choose next subplan for a non-parallel-aware Append,
362  *              returning false if there are no more.
363  * ----------------------------------------------------------------
364  */
365 static bool
366 choose_next_subplan_locally(AppendState *node)
367 {
368         int                     whichplan = node->as_whichplan;
369
370         /* We should never see INVALID_SUBPLAN_INDEX in this case. */
371         Assert(whichplan >= 0 && whichplan <= node->as_nplans);
372
373         if (ScanDirectionIsForward(node->ps.state->es_direction))
374         {
375                 if (whichplan >= node->as_nplans - 1)
376                         return false;
377                 node->as_whichplan++;
378         }
379         else
380         {
381                 if (whichplan <= 0)
382                         return false;
383                 node->as_whichplan--;
384         }
385
386         return true;
387 }
388
389 /* ----------------------------------------------------------------
390  *              choose_next_subplan_for_leader
391  *
392  *      Try to pick a plan which doesn't commit us to doing much
393  *      work locally, so that as much work as possible is done in
394  *      the workers.  Cheapest subplans are at the end.
395  * ----------------------------------------------------------------
396  */
397 static bool
398 choose_next_subplan_for_leader(AppendState *node)
399 {
400         ParallelAppendState *pstate = node->as_pstate;
401         Append     *append = (Append *) node->ps.plan;
402
403         /* Backward scan is not supported by parallel-aware plans */
404         Assert(ScanDirectionIsForward(node->ps.state->es_direction));
405
406         LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
407
408         if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
409         {
410                 /* Mark just-completed subplan as finished. */
411                 node->as_pstate->pa_finished[node->as_whichplan] = true;
412         }
413         else
414         {
415                 /* Start with last subplan. */
416                 node->as_whichplan = node->as_nplans - 1;
417         }
418
419         /* Loop until we find a subplan to execute. */
420         while (pstate->pa_finished[node->as_whichplan])
421         {
422                 if (node->as_whichplan == 0)
423                 {
424                         pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
425                         node->as_whichplan = INVALID_SUBPLAN_INDEX;
426                         LWLockRelease(&pstate->pa_lock);
427                         return false;
428                 }
429                 node->as_whichplan--;
430         }
431
432         /* If non-partial, immediately mark as finished. */
433         if (node->as_whichplan < append->first_partial_plan)
434                 node->as_pstate->pa_finished[node->as_whichplan] = true;
435
436         LWLockRelease(&pstate->pa_lock);
437
438         return true;
439 }
440
441 /* ----------------------------------------------------------------
442  *              choose_next_subplan_for_worker
443  *
444  *              Choose next subplan for a parallel-aware Append, returning
445  *              false if there are no more.
446  *
447  *              We start from the first plan and advance through the list;
448  *              when we get back to the end, we loop back to the first
449  *              nonpartial plan.  This assigns the non-partial plans first
450  *              in order of descending cost and then spreads out the
451  *              workers as evenly as possible across the remaining partial
452  *              plans.
453  * ----------------------------------------------------------------
454  */
455 static bool
456 choose_next_subplan_for_worker(AppendState *node)
457 {
458         ParallelAppendState *pstate = node->as_pstate;
459         Append     *append = (Append *) node->ps.plan;
460
461         /* Backward scan is not supported by parallel-aware plans */
462         Assert(ScanDirectionIsForward(node->ps.state->es_direction));
463
464         LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
465
466         /* Mark just-completed subplan as finished. */
467         if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
468                 node->as_pstate->pa_finished[node->as_whichplan] = true;
469
470         /* If all the plans are already done, we have nothing to do */
471         if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
472         {
473                 LWLockRelease(&pstate->pa_lock);
474                 return false;
475         }
476
477         /* Loop until we find a subplan to execute. */
478         while (pstate->pa_finished[pstate->pa_next_plan])
479         {
480                 if (pstate->pa_next_plan < node->as_nplans - 1)
481                 {
482                         /* Advance to next plan. */
483                         pstate->pa_next_plan++;
484                 }
485                 else if (append->first_partial_plan < node->as_nplans)
486                 {
487                         /* Loop back to first partial plan. */
488                         pstate->pa_next_plan = append->first_partial_plan;
489                 }
490                 else
491                 {
492                         /* At last plan, no partial plans, arrange to bail out. */
493                         pstate->pa_next_plan = node->as_whichplan;
494                 }
495
496                 if (pstate->pa_next_plan == node->as_whichplan)
497                 {
498                         /* We've tried everything! */
499                         pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
500                         LWLockRelease(&pstate->pa_lock);
501                         return false;
502                 }
503         }
504
505         /* Pick the plan we found, and advance pa_next_plan one more time. */
506         node->as_whichplan = pstate->pa_next_plan++;
507         if (pstate->pa_next_plan >= node->as_nplans)
508         {
509                 Assert(append->first_partial_plan < node->as_nplans);
510                 pstate->pa_next_plan = append->first_partial_plan;
511         }
512
513         /* If non-partial, immediately mark as finished. */
514         if (node->as_whichplan < append->first_partial_plan)
515                 node->as_pstate->pa_finished[node->as_whichplan] = true;
516
517         LWLockRelease(&pstate->pa_lock);
518
519         return true;
520 }