]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeGather.c
Remove memory leak protection from Gather and Gather Merge nodes.
[postgresql] / src / backend / executor / nodeGather.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeGather.c
4  *        Support routines for scanning a plan via multiple workers.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * A Gather executor launches parallel workers to run multiple copies of a
10  * plan.  It can also run the plan itself, if the workers are not available
11  * or have not started up yet.  It then merges all of the results it produces
12  * and the results from the workers into a single output stream.  Therefore,
13  * it will normally be used with a plan where running multiple copies of the
14  * same plan does not produce duplicate output, such as parallel-aware
15  * SeqScan.
16  *
17  * Alternatively, a Gather node can be configured to use just one worker
18  * and the single-copy flag can be set.  In this case, the Gather node will
19  * run the plan in one worker and will not execute the plan itself.  In
20  * this case, it simply returns whatever tuples were returned by the worker.
21  * If a worker cannot be obtained, then it will run the plan itself and
22  * return the results.  Therefore, a plan used with a single-copy Gather
23  * node need not be parallel-aware.
24  *
25  * IDENTIFICATION
26  *        src/backend/executor/nodeGather.c
27  *
28  *-------------------------------------------------------------------------
29  */
30
31 #include "postgres.h"
32
33 #include "access/relscan.h"
34 #include "access/xact.h"
35 #include "executor/execdebug.h"
36 #include "executor/execParallel.h"
37 #include "executor/nodeGather.h"
38 #include "executor/nodeSubplan.h"
39 #include "executor/tqueue.h"
40 #include "miscadmin.h"
41 #include "optimizer/planmain.h"
42 #include "pgstat.h"
43 #include "utils/memutils.h"
44 #include "utils/rel.h"
45
46
47 static TupleTableSlot *ExecGather(PlanState *pstate);
48 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49 static HeapTuple gather_readnext(GatherState *gatherstate);
50 static void ExecShutdownGatherWorkers(GatherState *node);
51
52
53 /* ----------------------------------------------------------------
54  *              ExecInitGather
55  * ----------------------------------------------------------------
56  */
57 GatherState *
58 ExecInitGather(Gather *node, EState *estate, int eflags)
59 {
60         GatherState *gatherstate;
61         Plan       *outerNode;
62         bool            hasoid;
63         TupleDesc       tupDesc;
64
65         /* Gather node doesn't have innerPlan node. */
66         Assert(innerPlan(node) == NULL);
67
68         /*
69          * create state structure
70          */
71         gatherstate = makeNode(GatherState);
72         gatherstate->ps.plan = (Plan *) node;
73         gatherstate->ps.state = estate;
74         gatherstate->ps.ExecProcNode = ExecGather;
75
76         gatherstate->initialized = false;
77         gatherstate->need_to_scan_locally =
78                 !node->single_copy && parallel_leader_participation;
79         gatherstate->tuples_needed = -1;
80
81         /*
82          * Miscellaneous initialization
83          *
84          * create expression context for node
85          */
86         ExecAssignExprContext(estate, &gatherstate->ps);
87
88         /*
89          * Gather doesn't support checking a qual (it's always more efficient to
90          * do it in the child node).
91          */
92         Assert(!node->plan.qual);
93
94         /*
95          * tuple table initialization
96          */
97         gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
98         ExecInitResultTupleSlot(estate, &gatherstate->ps);
99
100         /*
101          * now initialize outer plan
102          */
103         outerNode = outerPlan(node);
104         outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
105
106         /*
107          * Initialize funnel slot to same tuple descriptor as outer plan.
108          */
109         if (!ExecContextForcesOids(outerPlanState(gatherstate), &hasoid))
110                 hasoid = false;
111         tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
112         ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
113
114         /*
115          * Initialize result tuple type and projection info.
116          */
117         ExecAssignResultTypeFromTL(&gatherstate->ps);
118         ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
119
120         return gatherstate;
121 }
122
123 /* ----------------------------------------------------------------
124  *              ExecGather(node)
125  *
126  *              Scans the relation via multiple workers and returns
127  *              the next qualifying tuple.
128  * ----------------------------------------------------------------
129  */
130 static TupleTableSlot *
131 ExecGather(PlanState *pstate)
132 {
133         GatherState *node = castNode(GatherState, pstate);
134         TupleTableSlot *slot;
135         ExprContext *econtext;
136
137         CHECK_FOR_INTERRUPTS();
138
139         /*
140          * Initialize the parallel context and workers on first execution. We do
141          * this on first execution rather than during node initialization, as it
142          * needs to allocate a large dynamic segment, so it is better to do it
143          * only if it is really needed.
144          */
145         if (!node->initialized)
146         {
147                 EState     *estate = node->ps.state;
148                 Gather     *gather = (Gather *) node->ps.plan;
149
150                 /*
151                  * Sometimes we might have to run without parallelism; but if parallel
152                  * mode is active then we can try to fire up some workers.
153                  */
154                 if (gather->num_workers > 0 && estate->es_use_parallel_mode)
155                 {
156                         ParallelContext *pcxt;
157
158                         /* Initialize, or re-initialize, shared state needed by workers. */
159                         if (!node->pei)
160                                 node->pei = ExecInitParallelPlan(node->ps.lefttree,
161                                                                                                  estate,
162                                                                                                  gather->initParam,
163                                                                                                  gather->num_workers,
164                                                                                                  node->tuples_needed);
165                         else
166                                 ExecParallelReinitialize(node->ps.lefttree,
167                                                                                  node->pei,
168                                                                                  gather->initParam);
169
170                         /*
171                          * Register backend workers. We might not get as many as we
172                          * requested, or indeed any at all.
173                          */
174                         pcxt = node->pei->pcxt;
175                         LaunchParallelWorkers(pcxt);
176                         /* We save # workers launched for the benefit of EXPLAIN */
177                         node->nworkers_launched = pcxt->nworkers_launched;
178
179                         /* Set up tuple queue readers to read the results. */
180                         if (pcxt->nworkers_launched > 0)
181                         {
182                                 ExecParallelCreateReaders(node->pei);
183                                 /* Make a working array showing the active readers */
184                                 node->nreaders = pcxt->nworkers_launched;
185                                 node->reader = (TupleQueueReader **)
186                                         palloc(node->nreaders * sizeof(TupleQueueReader *));
187                                 memcpy(node->reader, node->pei->reader,
188                                            node->nreaders * sizeof(TupleQueueReader *));
189                         }
190                         else
191                         {
192                                 /* No workers?  Then never mind. */
193                                 node->nreaders = 0;
194                                 node->reader = NULL;
195                         }
196                         node->nextreader = 0;
197                 }
198
199                 /* Run plan locally if no workers or enabled and not single-copy. */
200                 node->need_to_scan_locally = (node->nreaders == 0)
201                         || (!gather->single_copy && parallel_leader_participation);
202                 node->initialized = true;
203         }
204
205         /*
206          * Reset per-tuple memory context to free any expression evaluation
207          * storage allocated in the previous tuple cycle.
208          */
209         econtext = node->ps.ps_ExprContext;
210         ResetExprContext(econtext);
211
212         /*
213          * Get next tuple, either from one of our workers, or by running the plan
214          * ourselves.
215          */
216         slot = gather_getnext(node);
217         if (TupIsNull(slot))
218                 return NULL;
219
220         /* If no projection is required, we're done. */
221         if (node->ps.ps_ProjInfo == NULL)
222                 return slot;
223
224         /*
225          * Form the result tuple using ExecProject(), and return it.
226          */
227         econtext->ecxt_outertuple = slot;
228         return ExecProject(node->ps.ps_ProjInfo);
229 }
230
231 /* ----------------------------------------------------------------
232  *              ExecEndGather
233  *
234  *              frees any storage allocated through C routines.
235  * ----------------------------------------------------------------
236  */
237 void
238 ExecEndGather(GatherState *node)
239 {
240         ExecEndNode(outerPlanState(node));      /* let children clean up first */
241         ExecShutdownGather(node);
242         ExecFreeExprContext(&node->ps);
243         ExecClearTuple(node->ps.ps_ResultTupleSlot);
244 }
245
246 /*
247  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
248  * using gather_readnext, or if no tuple queue contains a tuple and the
249  * single_copy flag is not set, we might generate one locally instead.
250  */
251 static TupleTableSlot *
252 gather_getnext(GatherState *gatherstate)
253 {
254         PlanState  *outerPlan = outerPlanState(gatherstate);
255         TupleTableSlot *outerTupleSlot;
256         TupleTableSlot *fslot = gatherstate->funnel_slot;
257         HeapTuple       tup;
258
259         while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
260         {
261                 CHECK_FOR_INTERRUPTS();
262
263                 if (gatherstate->nreaders > 0)
264                 {
265                         tup = gather_readnext(gatherstate);
266
267                         if (HeapTupleIsValid(tup))
268                         {
269                                 ExecStoreTuple(tup, /* tuple to store */
270                                                            fslot,       /* slot in which to store the tuple */
271                                                            InvalidBuffer,       /* buffer associated with this
272                                                                                                  * tuple */
273                                                            true);       /* pfree tuple when done with it */
274                                 return fslot;
275                         }
276                 }
277
278                 if (gatherstate->need_to_scan_locally)
279                 {
280                         outerTupleSlot = ExecProcNode(outerPlan);
281
282                         if (!TupIsNull(outerTupleSlot))
283                                 return outerTupleSlot;
284
285                         gatherstate->need_to_scan_locally = false;
286                 }
287         }
288
289         return ExecClearTuple(fslot);
290 }
291
292 /*
293  * Attempt to read a tuple from one of our parallel workers.
294  */
295 static HeapTuple
296 gather_readnext(GatherState *gatherstate)
297 {
298         int                     nvisited = 0;
299
300         for (;;)
301         {
302                 TupleQueueReader *reader;
303                 HeapTuple       tup;
304                 bool            readerdone;
305
306                 /* Check for async events, particularly messages from workers. */
307                 CHECK_FOR_INTERRUPTS();
308
309                 /* Attempt to read a tuple, but don't block if none is available. */
310                 Assert(gatherstate->nextreader < gatherstate->nreaders);
311                 reader = gatherstate->reader[gatherstate->nextreader];
312                 tup = TupleQueueReaderNext(reader, true, &readerdone);
313
314                 /*
315                  * If this reader is done, remove it from our working array of active
316                  * readers.  If all readers are done, we're outta here.
317                  */
318                 if (readerdone)
319                 {
320                         Assert(!tup);
321                         --gatherstate->nreaders;
322                         if (gatherstate->nreaders == 0)
323                                 return NULL;
324                         memmove(&gatherstate->reader[gatherstate->nextreader],
325                                         &gatherstate->reader[gatherstate->nextreader + 1],
326                                         sizeof(TupleQueueReader *)
327                                         * (gatherstate->nreaders - gatherstate->nextreader));
328                         if (gatherstate->nextreader >= gatherstate->nreaders)
329                                 gatherstate->nextreader = 0;
330                         continue;
331                 }
332
333                 /* If we got a tuple, return it. */
334                 if (tup)
335                         return tup;
336
337                 /*
338                  * Advance nextreader pointer in round-robin fashion.  Note that we
339                  * only reach this code if we weren't able to get a tuple from the
340                  * current worker.  We used to advance the nextreader pointer after
341                  * every tuple, but it turns out to be much more efficient to keep
342                  * reading from the same queue until that would require blocking.
343                  */
344                 gatherstate->nextreader++;
345                 if (gatherstate->nextreader >= gatherstate->nreaders)
346                         gatherstate->nextreader = 0;
347
348                 /* Have we visited every (surviving) TupleQueueReader? */
349                 nvisited++;
350                 if (nvisited >= gatherstate->nreaders)
351                 {
352                         /*
353                          * If (still) running plan locally, return NULL so caller can
354                          * generate another tuple from the local copy of the plan.
355                          */
356                         if (gatherstate->need_to_scan_locally)
357                                 return NULL;
358
359                         /* Nothing to do except wait for developments. */
360                         WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_EXECUTE_GATHER);
361                         ResetLatch(MyLatch);
362                         nvisited = 0;
363                 }
364         }
365 }
366
367 /* ----------------------------------------------------------------
368  *              ExecShutdownGatherWorkers
369  *
370  *              Stop all the parallel workers.
371  * ----------------------------------------------------------------
372  */
373 static void
374 ExecShutdownGatherWorkers(GatherState *node)
375 {
376         if (node->pei != NULL)
377                 ExecParallelFinish(node->pei);
378
379         /* Flush local copy of reader array */
380         if (node->reader)
381                 pfree(node->reader);
382         node->reader = NULL;
383 }
384
385 /* ----------------------------------------------------------------
386  *              ExecShutdownGather
387  *
388  *              Destroy the setup for parallel workers including parallel context.
389  * ----------------------------------------------------------------
390  */
391 void
392 ExecShutdownGather(GatherState *node)
393 {
394         ExecShutdownGatherWorkers(node);
395
396         /* Now destroy the parallel context. */
397         if (node->pei != NULL)
398         {
399                 ExecParallelCleanup(node->pei);
400                 node->pei = NULL;
401         }
402 }
403
404 /* ----------------------------------------------------------------
405  *                                              Join Support
406  * ----------------------------------------------------------------
407  */
408
409 /* ----------------------------------------------------------------
410  *              ExecReScanGather
411  *
412  *              Prepare to re-scan the result of a Gather.
413  * ----------------------------------------------------------------
414  */
415 void
416 ExecReScanGather(GatherState *node)
417 {
418         Gather     *gather = (Gather *) node->ps.plan;
419         PlanState  *outerPlan = outerPlanState(node);
420
421         /* Make sure any existing workers are gracefully shut down */
422         ExecShutdownGatherWorkers(node);
423
424         /* Mark node so that shared state will be rebuilt at next call */
425         node->initialized = false;
426
427         /*
428          * Set child node's chgParam to tell it that the next scan might deliver a
429          * different set of rows within the leader process.  (The overall rowset
430          * shouldn't change, but the leader process's subset might; hence nodes
431          * between here and the parallel table scan node mustn't optimize on the
432          * assumption of an unchanging rowset.)
433          */
434         if (gather->rescan_param >= 0)
435                 outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
436                                                                                          gather->rescan_param);
437
438         /*
439          * If chgParam of subnode is not null then plan will be re-scanned by
440          * first ExecProcNode.  Note: because this does nothing if we have a
441          * rescan_param, it's currently guaranteed that parallel-aware child nodes
442          * will not see a ReScan call until after they get a ReInitializeDSM call.
443          * That ordering might not be something to rely on, though.  A good rule
444          * of thumb is that ReInitializeDSM should reset only shared state, ReScan
445          * should reset only local state, and anything that depends on both of
446          * those steps being finished must wait until the first ExecProcNode call.
447          */
448         if (outerPlan->chgParam == NULL)
449                 ExecReScan(outerPlan);
450 }