]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeMergeAppend.c
Support MergeAppend plans, to allow sorted output from append relations.
[postgresql] / src / backend / executor / nodeMergeAppend.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *        routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *              ExecInitMergeAppend             - initialize the MergeAppend node
17  *              ExecMergeAppend                 - retrieve the next tuple from the node
18  *              ExecEndMergeAppend              - shut down the MergeAppend node
19  *              ExecReScanMergeAppend   - rescan the MergeAppend node
20  *
21  *       NOTES
22  *              A MergeAppend node contains a list of one or more subplans.
23  *              These are each expected to deliver tuples that are sorted according
24  *              to a common sort key.  The MergeAppend node merges these streams
25  *              to produce output sorted the same way.
26  *
27  *              MergeAppend nodes don't make use of their left and right
28  *              subtrees, rather they maintain a list of subplans so
29  *              a typical MergeAppend node looks like this in the plan tree:
30  *
31  *                                 ...
32  *                                 /
33  *                              MergeAppend---+------+------+--- nil
34  *                              /       \                 |              |              |
35  *                        nil   nil              ...    ...    ...
36  *                                                               subplans
37  */
38
39 #include "postgres.h"
40
41 #include "access/nbtree.h"
42 #include "executor/execdebug.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "utils/lsyscache.h"
45
46 /*
47  * It gets quite confusing having a heap array (indexed by integers) which
48  * contains integers which index into the slots array. These typedefs try to
49  * clear it up, but they're only documentation.
50  */
51 typedef int             SlotNumber;
52 typedef int             HeapPosition;
53
54 static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
55 static void heap_siftup_slot(MergeAppendState *node);
56 static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
57
58
59 /* ----------------------------------------------------------------
60  *              ExecInitMergeAppend
61  *
62  *              Begin all of the subscans of the MergeAppend node.
63  * ----------------------------------------------------------------
64  */
65 MergeAppendState *
66 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
67 {
68         MergeAppendState *mergestate = makeNode(MergeAppendState);
69         PlanState **mergeplanstates;
70         int                     nplans;
71         int                     i;
72         ListCell   *lc;
73
74         /* check for unsupported flags */
75         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76
77         /*
78          * Set up empty vector of subplan states
79          */
80         nplans = list_length(node->mergeplans);
81
82         mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
83
84         /*
85          * create new MergeAppendState for our node
86          */
87         mergestate->ps.plan = (Plan *) node;
88         mergestate->ps.state = estate;
89         mergestate->mergeplans = mergeplanstates;
90         mergestate->ms_nplans = nplans;
91
92         mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
93         mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
94
95         /*
96          * Miscellaneous initialization
97          *
98          * MergeAppend plans don't have expression contexts because they never
99          * call ExecQual or ExecProject.
100          */
101
102         /*
103          * MergeAppend nodes do have Result slots, which hold pointers to tuples,
104          * so we have to initialize them.
105          */
106         ExecInitResultTupleSlot(estate, &mergestate->ps);
107
108         /*
109          * call ExecInitNode on each of the plans to be executed and save the
110          * results into the array "mergeplans".
111          */
112         i = 0;
113         foreach(lc, node->mergeplans)
114         {
115                 Plan       *initNode = (Plan *) lfirst(lc);
116
117                 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
118                 i++;
119         }
120
121         /*
122          * initialize output tuple type
123          */
124         ExecAssignResultTypeFromTL(&mergestate->ps);
125         mergestate->ps.ps_ProjInfo = NULL;
126
127         /*
128          * initialize sort-key information
129          */
130         mergestate->ms_nkeys = node->numCols;
131         mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) *  node->numCols);
132
133         for (i = 0; i < node->numCols; i++)
134         {
135                 Oid             sortFunction;
136                 bool    reverse;
137
138                 if (!get_compare_function_for_ordering_op(node->sortOperators[i],
139                                                                                                   &sortFunction, &reverse))
140                         elog(ERROR, "operator %u is not a valid ordering operator",
141                                  node->sortOperators[i]);
142
143                 /*
144                  * We needn't fill in sk_strategy or sk_subtype since these scankeys
145                  * will never be passed to an index.
146                  */
147                 ScanKeyInit(&mergestate->ms_scankeys[i],
148                                         node->sortColIdx[i],
149                                         InvalidStrategy,
150                                         sortFunction,
151                                         (Datum) 0);
152
153                 /* However, we use btree's conventions for encoding directionality */
154                 if (reverse)
155                         mergestate->ms_scankeys[i].sk_flags |= SK_BT_DESC;
156                 if (node->nullsFirst[i])
157                         mergestate->ms_scankeys[i].sk_flags |= SK_BT_NULLS_FIRST;
158         }
159
160         /*
161          * initialize to show we have not run the subplans yet
162          */
163         mergestate->ms_heap_size = 0;
164         mergestate->ms_initialized = false;
165         mergestate->ms_last_slot = -1;
166
167         return mergestate;
168 }
169
170 /* ----------------------------------------------------------------
171  *         ExecMergeAppend
172  *
173  *              Handles iteration over multiple subplans.
174  * ----------------------------------------------------------------
175  */
176 TupleTableSlot *
177 ExecMergeAppend(MergeAppendState *node)
178 {
179         TupleTableSlot *result;
180         SlotNumber      i;
181
182         if (!node->ms_initialized)
183         {
184                 /*
185                  * First time through: pull the first tuple from each subplan,
186                  * and set up the heap.
187                  */
188                 for (i = 0; i < node->ms_nplans; i++)
189                 {
190                         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
191                         if (!TupIsNull(node->ms_slots[i]))
192                                 heap_insert_slot(node, i);
193                 }
194                 node->ms_initialized = true;
195         }
196         else
197         {
198                 /*
199                  * Otherwise, pull the next tuple from whichever subplan we returned
200                  * from last time, and insert it into the heap.  (We could simplify
201                  * the logic a bit by doing this before returning from the prior call,
202                  * but it's better to not pull tuples until necessary.)
203                  */
204                 i = node->ms_last_slot;
205                 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
206                 if (!TupIsNull(node->ms_slots[i]))
207                         heap_insert_slot(node, i);
208         }
209
210         if (node->ms_heap_size > 0)
211         {
212                 /* Return the topmost heap node, and sift up the remaining nodes */
213                 i = node->ms_heap[0];
214                 result = node->ms_slots[i];
215                 node->ms_last_slot = i;
216                 heap_siftup_slot(node);
217         }
218         else
219         {
220                 /* All the subplans are exhausted, and so is the heap */
221                 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
222         }
223
224         return result;
225 }
226
227 /*
228  * Insert a new slot into the heap.  The slot must contain a valid tuple.
229  */
230 static void
231 heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
232 {
233         SlotNumber *heap = node->ms_heap;
234         HeapPosition j;
235
236         Assert(!TupIsNull(node->ms_slots[new_slot]));
237
238         j = node->ms_heap_size++;       /* j is where the "hole" is */
239         while (j > 0)
240         {
241                 int             i = (j-1)/2;
242
243                 if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
244                         break;
245                 heap[j] = heap[i];
246                 j = i;
247         }
248         heap[j] = new_slot;
249 }
250
251 /*
252  * Delete the heap top (the slot in heap[0]), and sift up.
253  */
254 static void
255 heap_siftup_slot(MergeAppendState *node)
256 {
257         SlotNumber *heap = node->ms_heap;
258         HeapPosition i,
259                                 n;
260
261         if (--node->ms_heap_size <= 0)
262                 return;
263         n = node->ms_heap_size;         /* heap[n] needs to be reinserted */
264         i = 0;                                          /* i is where the "hole" is */
265         for (;;)
266         {
267                 int             j = 2 * i + 1;
268
269                 if (j >= n)
270                         break;
271                 if (j+1 < n && heap_compare_slots(node, heap[j], heap[j+1]) > 0)
272                         j++;
273                 if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
274                         break;
275                 heap[i] = heap[j];
276                 i = j;
277         }
278         heap[i] = heap[n];
279 }
280
281 /*
282  * Compare the tuples in the two given slots.
283  */
284 static int32
285 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
286 {
287         TupleTableSlot *s1 = node->ms_slots[slot1];
288         TupleTableSlot *s2 = node->ms_slots[slot2];
289         int                     nkey;
290
291         Assert(!TupIsNull(s1));
292         Assert(!TupIsNull(s2));
293
294         for (nkey = 0; nkey < node->ms_nkeys; nkey++)
295         {
296                 ScanKey scankey = node->ms_scankeys + nkey;
297                 AttrNumber attno = scankey->sk_attno;
298                 Datum   datum1,
299                                 datum2;
300                 bool    isNull1,
301                                 isNull2;
302                 int32   compare;
303
304                 datum1 = slot_getattr(s1, attno, &isNull1);
305                 datum2 = slot_getattr(s2, attno, &isNull2);
306
307                 if (isNull1)
308                 {
309                         if (isNull2)
310                                 continue;               /* NULL "=" NULL */
311                         else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
312                                 return -1;              /* NULL "<" NOT_NULL */
313                         else
314                                 return 1;               /* NULL ">" NOT_NULL */
315                 }
316                 else if (isNull2)
317                 {
318                         if (scankey->sk_flags & SK_BT_NULLS_FIRST)
319                                 return 1;               /* NOT_NULL ">" NULL */
320                         else
321                                 return -1;              /* NOT_NULL "<" NULL */
322                 }
323                 else
324                 {
325                         compare = DatumGetInt32(FunctionCall2(&scankey->sk_func,
326                                                                                                   datum1, datum2));
327                         if (compare != 0)
328                         {
329                                 if (scankey->sk_flags & SK_BT_DESC)
330                                         compare = -compare;
331                                 return compare;
332                         }
333                 }
334         }
335         return 0;
336 }
337
338 /* ----------------------------------------------------------------
339  *              ExecEndMergeAppend
340  *
341  *              Shuts down the subscans of the MergeAppend node.
342  *
343  *              Returns nothing of interest.
344  * ----------------------------------------------------------------
345  */
346 void
347 ExecEndMergeAppend(MergeAppendState *node)
348 {
349         PlanState **mergeplans;
350         int                     nplans;
351         int                     i;
352
353         /*
354          * get information from the node
355          */
356         mergeplans = node->mergeplans;
357         nplans = node->ms_nplans;
358
359         /*
360          * shut down each of the subscans
361          */
362         for (i = 0; i < nplans; i++)
363                 ExecEndNode(mergeplans[i]);
364 }
365
366 void
367 ExecReScanMergeAppend(MergeAppendState *node)
368 {
369         int                     i;
370
371         for (i = 0; i < node->ms_nplans; i++)
372         {
373                 PlanState  *subnode = node->mergeplans[i];
374
375                 /*
376                  * ExecReScan doesn't know about my subplans, so I have to do
377                  * changed-parameter signaling myself.
378                  */
379                 if (node->ps.chgParam != NULL)
380                         UpdateChangedParamSet(subnode, node->ps.chgParam);
381
382                 /*
383                  * If chgParam of subnode is not null then plan will be re-scanned by
384                  * first ExecProcNode.
385                  */
386                 if (subnode->chgParam == NULL)
387                         ExecReScan(subnode);
388         }
389         node->ms_heap_size = 0;
390         node->ms_initialized = false;
391         node->ms_last_slot = -1;
392 }