]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeMergeAppend.c
Faster expression evaluation and targetlist projection.
[postgresql] / src / backend / executor / nodeMergeAppend.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *        routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *              ExecInitMergeAppend             - initialize the MergeAppend node
17  *              ExecMergeAppend                 - retrieve the next tuple from the node
18  *              ExecEndMergeAppend              - shut down the MergeAppend node
19  *              ExecReScanMergeAppend   - rescan the MergeAppend node
20  *
21  *       NOTES
22  *              A MergeAppend node contains a list of one or more subplans.
23  *              These are each expected to deliver tuples that are sorted according
24  *              to a common sort key.  The MergeAppend node merges these streams
25  *              to produce output sorted the same way.
26  *
27  *              MergeAppend nodes don't make use of their left and right
28  *              subtrees, rather they maintain a list of subplans so
29  *              a typical MergeAppend node looks like this in the plan tree:
30  *
31  *                                 ...
32  *                                 /
33  *                              MergeAppend---+------+------+--- nil
34  *                              /       \                 |              |              |
35  *                        nil   nil              ...    ...    ...
36  *                                                               subplans
37  */
38
39 #include "postgres.h"
40
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43
44 #include "lib/binaryheap.h"
45
46 /*
47  * We have one slot for each item in the heap array.  We use SlotNumber
48  * to store slot indexes.  This doesn't actually provide any formal
49  * type-safety, but it makes the code more self-documenting.
50  */
51 typedef int32 SlotNumber;
52
53 static int      heap_compare_slots(Datum a, Datum b, void *arg);
54
55
56 /* ----------------------------------------------------------------
57  *              ExecInitMergeAppend
58  *
59  *              Begin all of the subscans of the MergeAppend node.
60  * ----------------------------------------------------------------
61  */
62 MergeAppendState *
63 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
64 {
65         MergeAppendState *mergestate = makeNode(MergeAppendState);
66         PlanState **mergeplanstates;
67         int                     nplans;
68         int                     i;
69         ListCell   *lc;
70
71         /* check for unsupported flags */
72         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
73
74         /*
75          * Lock the non-leaf tables in the partition tree controlled by this
76          * node.  It's a no-op for non-partitioned parent tables.
77          */
78         ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
79
80         /*
81          * Set up empty vector of subplan states
82          */
83         nplans = list_length(node->mergeplans);
84
85         mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
86
87         /*
88          * create new MergeAppendState for our node
89          */
90         mergestate->ps.plan = (Plan *) node;
91         mergestate->ps.state = estate;
92         mergestate->mergeplans = mergeplanstates;
93         mergestate->ms_nplans = nplans;
94
95         mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
96         mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
97                                                                                           mergestate);
98
99         /*
100          * Miscellaneous initialization
101          *
102          * MergeAppend plans don't have expression contexts because they never
103          * call ExecQual or ExecProject.
104          */
105
106         /*
107          * MergeAppend nodes do have Result slots, which hold pointers to tuples,
108          * so we have to initialize them.
109          */
110         ExecInitResultTupleSlot(estate, &mergestate->ps);
111
112         /*
113          * call ExecInitNode on each of the plans to be executed and save the
114          * results into the array "mergeplans".
115          */
116         i = 0;
117         foreach(lc, node->mergeplans)
118         {
119                 Plan       *initNode = (Plan *) lfirst(lc);
120
121                 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
122                 i++;
123         }
124
125         /*
126          * initialize output tuple type
127          */
128         ExecAssignResultTypeFromTL(&mergestate->ps);
129         mergestate->ps.ps_ProjInfo = NULL;
130
131         /*
132          * initialize sort-key information
133          */
134         mergestate->ms_nkeys = node->numCols;
135         mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
136
137         for (i = 0; i < node->numCols; i++)
138         {
139                 SortSupport sortKey = mergestate->ms_sortkeys + i;
140
141                 sortKey->ssup_cxt = CurrentMemoryContext;
142                 sortKey->ssup_collation = node->collations[i];
143                 sortKey->ssup_nulls_first = node->nullsFirst[i];
144                 sortKey->ssup_attno = node->sortColIdx[i];
145
146                 /*
147                  * It isn't feasible to perform abbreviated key conversion, since
148                  * tuples are pulled into mergestate's binary heap as needed.  It
149                  * would likely be counter-productive to convert tuples into an
150                  * abbreviated representation as they're pulled up, so opt out of that
151                  * additional optimization entirely.
152                  */
153                 sortKey->abbreviate = false;
154
155                 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
156         }
157
158         /*
159          * initialize to show we have not run the subplans yet
160          */
161         mergestate->ms_initialized = false;
162
163         return mergestate;
164 }
165
166 /* ----------------------------------------------------------------
167  *         ExecMergeAppend
168  *
169  *              Handles iteration over multiple subplans.
170  * ----------------------------------------------------------------
171  */
172 TupleTableSlot *
173 ExecMergeAppend(MergeAppendState *node)
174 {
175         TupleTableSlot *result;
176         SlotNumber      i;
177
178         if (!node->ms_initialized)
179         {
180                 /*
181                  * First time through: pull the first tuple from each subplan, and set
182                  * up the heap.
183                  */
184                 for (i = 0; i < node->ms_nplans; i++)
185                 {
186                         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
187                         if (!TupIsNull(node->ms_slots[i]))
188                                 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
189                 }
190                 binaryheap_build(node->ms_heap);
191                 node->ms_initialized = true;
192         }
193         else
194         {
195                 /*
196                  * Otherwise, pull the next tuple from whichever subplan we returned
197                  * from last time, and reinsert the subplan index into the heap,
198                  * because it might now compare differently against the existing
199                  * elements of the heap.  (We could perhaps simplify the logic a bit
200                  * by doing this before returning from the prior call, but it's better
201                  * to not pull tuples until necessary.)
202                  */
203                 i = DatumGetInt32(binaryheap_first(node->ms_heap));
204                 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
205                 if (!TupIsNull(node->ms_slots[i]))
206                         binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
207                 else
208                         (void) binaryheap_remove_first(node->ms_heap);
209         }
210
211         if (binaryheap_empty(node->ms_heap))
212         {
213                 /* All the subplans are exhausted, and so is the heap */
214                 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
215         }
216         else
217         {
218                 i = DatumGetInt32(binaryheap_first(node->ms_heap));
219                 result = node->ms_slots[i];
220         }
221
222         return result;
223 }
224
225 /*
226  * Compare the tuples in the two given slots.
227  */
228 static int32
229 heap_compare_slots(Datum a, Datum b, void *arg)
230 {
231         MergeAppendState *node = (MergeAppendState *) arg;
232         SlotNumber      slot1 = DatumGetInt32(a);
233         SlotNumber      slot2 = DatumGetInt32(b);
234
235         TupleTableSlot *s1 = node->ms_slots[slot1];
236         TupleTableSlot *s2 = node->ms_slots[slot2];
237         int                     nkey;
238
239         Assert(!TupIsNull(s1));
240         Assert(!TupIsNull(s2));
241
242         for (nkey = 0; nkey < node->ms_nkeys; nkey++)
243         {
244                 SortSupport sortKey = node->ms_sortkeys + nkey;
245                 AttrNumber      attno = sortKey->ssup_attno;
246                 Datum           datum1,
247                                         datum2;
248                 bool            isNull1,
249                                         isNull2;
250                 int                     compare;
251
252                 datum1 = slot_getattr(s1, attno, &isNull1);
253                 datum2 = slot_getattr(s2, attno, &isNull2);
254
255                 compare = ApplySortComparator(datum1, isNull1,
256                                                                           datum2, isNull2,
257                                                                           sortKey);
258                 if (compare != 0)
259                         return -compare;
260         }
261         return 0;
262 }
263
264 /* ----------------------------------------------------------------
265  *              ExecEndMergeAppend
266  *
267  *              Shuts down the subscans of the MergeAppend node.
268  *
269  *              Returns nothing of interest.
270  * ----------------------------------------------------------------
271  */
272 void
273 ExecEndMergeAppend(MergeAppendState *node)
274 {
275         PlanState **mergeplans;
276         int                     nplans;
277         int                     i;
278
279         /*
280          * get information from the node
281          */
282         mergeplans = node->mergeplans;
283         nplans = node->ms_nplans;
284
285         /*
286          * shut down each of the subscans
287          */
288         for (i = 0; i < nplans; i++)
289                 ExecEndNode(mergeplans[i]);
290 }
291
292 void
293 ExecReScanMergeAppend(MergeAppendState *node)
294 {
295         int                     i;
296
297         for (i = 0; i < node->ms_nplans; i++)
298         {
299                 PlanState  *subnode = node->mergeplans[i];
300
301                 /*
302                  * ExecReScan doesn't know about my subplans, so I have to do
303                  * changed-parameter signaling myself.
304                  */
305                 if (node->ps.chgParam != NULL)
306                         UpdateChangedParamSet(subnode, node->ps.chgParam);
307
308                 /*
309                  * If chgParam of subnode is not null then plan will be re-scanned by
310                  * first ExecProcNode.
311                  */
312                 if (subnode->chgParam == NULL)
313                         ExecReScan(subnode);
314         }
315         binaryheap_reset(node->ms_heap);
316         node->ms_initialized = false;
317 }