]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeMergeAppend.c
Create a "sort support" interface API for faster sorting.
[postgresql] / src / backend / executor / nodeMergeAppend.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  *        routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  *              ExecInitMergeAppend             - initialize the MergeAppend node
17  *              ExecMergeAppend                 - retrieve the next tuple from the node
18  *              ExecEndMergeAppend              - shut down the MergeAppend node
19  *              ExecReScanMergeAppend   - rescan the MergeAppend node
20  *
21  *       NOTES
22  *              A MergeAppend node contains a list of one or more subplans.
23  *              These are each expected to deliver tuples that are sorted according
24  *              to a common sort key.  The MergeAppend node merges these streams
25  *              to produce output sorted the same way.
26  *
27  *              MergeAppend nodes don't make use of their left and right
28  *              subtrees, rather they maintain a list of subplans so
29  *              a typical MergeAppend node looks like this in the plan tree:
30  *
31  *                                 ...
32  *                                 /
33  *                              MergeAppend---+------+------+--- nil
34  *                              /       \                 |              |              |
35  *                        nil   nil              ...    ...    ...
36  *                                                               subplans
37  */
38
39 #include "postgres.h"
40
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
43
44 /*
45  * It gets quite confusing having a heap array (indexed by integers) which
46  * contains integers which index into the slots array. These typedefs try to
47  * clear it up, but they're only documentation.
48  */
49 typedef int SlotNumber;
50 typedef int HeapPosition;
51
52 static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
53 static void heap_siftup_slot(MergeAppendState *node);
54 static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
55
56
57 /* ----------------------------------------------------------------
58  *              ExecInitMergeAppend
59  *
60  *              Begin all of the subscans of the MergeAppend node.
61  * ----------------------------------------------------------------
62  */
63 MergeAppendState *
64 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
65 {
66         MergeAppendState *mergestate = makeNode(MergeAppendState);
67         PlanState **mergeplanstates;
68         int                     nplans;
69         int                     i;
70         ListCell   *lc;
71
72         /* check for unsupported flags */
73         Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
74
75         /*
76          * Set up empty vector of subplan states
77          */
78         nplans = list_length(node->mergeplans);
79
80         mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
81
82         /*
83          * create new MergeAppendState for our node
84          */
85         mergestate->ps.plan = (Plan *) node;
86         mergestate->ps.state = estate;
87         mergestate->mergeplans = mergeplanstates;
88         mergestate->ms_nplans = nplans;
89
90         mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
91         mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
92
93         /*
94          * Miscellaneous initialization
95          *
96          * MergeAppend plans don't have expression contexts because they never
97          * call ExecQual or ExecProject.
98          */
99
100         /*
101          * MergeAppend nodes do have Result slots, which hold pointers to tuples,
102          * so we have to initialize them.
103          */
104         ExecInitResultTupleSlot(estate, &mergestate->ps);
105
106         /*
107          * call ExecInitNode on each of the plans to be executed and save the
108          * results into the array "mergeplans".
109          */
110         i = 0;
111         foreach(lc, node->mergeplans)
112         {
113                 Plan       *initNode = (Plan *) lfirst(lc);
114
115                 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
116                 i++;
117         }
118
119         /*
120          * initialize output tuple type
121          */
122         ExecAssignResultTypeFromTL(&mergestate->ps);
123         mergestate->ps.ps_ProjInfo = NULL;
124
125         /*
126          * initialize sort-key information
127          */
128         mergestate->ms_nkeys = node->numCols;
129         mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
130
131         for (i = 0; i < node->numCols; i++)
132         {
133                 SortSupport     sortKey = mergestate->ms_sortkeys + i;
134
135                 sortKey->ssup_cxt = CurrentMemoryContext;
136                 sortKey->ssup_collation = node->collations[i];
137                 sortKey->ssup_nulls_first = node->nullsFirst[i];
138                 sortKey->ssup_attno = node->sortColIdx[i];
139
140                 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
141         }
142
143         /*
144          * initialize to show we have not run the subplans yet
145          */
146         mergestate->ms_heap_size = 0;
147         mergestate->ms_initialized = false;
148         mergestate->ms_last_slot = -1;
149
150         return mergestate;
151 }
152
153 /* ----------------------------------------------------------------
154  *         ExecMergeAppend
155  *
156  *              Handles iteration over multiple subplans.
157  * ----------------------------------------------------------------
158  */
159 TupleTableSlot *
160 ExecMergeAppend(MergeAppendState *node)
161 {
162         TupleTableSlot *result;
163         SlotNumber      i;
164
165         if (!node->ms_initialized)
166         {
167                 /*
168                  * First time through: pull the first tuple from each subplan, and set
169                  * up the heap.
170                  */
171                 for (i = 0; i < node->ms_nplans; i++)
172                 {
173                         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
174                         if (!TupIsNull(node->ms_slots[i]))
175                                 heap_insert_slot(node, i);
176                 }
177                 node->ms_initialized = true;
178         }
179         else
180         {
181                 /*
182                  * Otherwise, pull the next tuple from whichever subplan we returned
183                  * from last time, and insert it into the heap.  (We could simplify
184                  * the logic a bit by doing this before returning from the prior call,
185                  * but it's better to not pull tuples until necessary.)
186                  */
187                 i = node->ms_last_slot;
188                 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
189                 if (!TupIsNull(node->ms_slots[i]))
190                         heap_insert_slot(node, i);
191         }
192
193         if (node->ms_heap_size > 0)
194         {
195                 /* Return the topmost heap node, and sift up the remaining nodes */
196                 i = node->ms_heap[0];
197                 result = node->ms_slots[i];
198                 node->ms_last_slot = i;
199                 heap_siftup_slot(node);
200         }
201         else
202         {
203                 /* All the subplans are exhausted, and so is the heap */
204                 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
205         }
206
207         return result;
208 }
209
210 /*
211  * Insert a new slot into the heap.  The slot must contain a valid tuple.
212  */
213 static void
214 heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
215 {
216         SlotNumber *heap = node->ms_heap;
217         HeapPosition j;
218
219         Assert(!TupIsNull(node->ms_slots[new_slot]));
220
221         j = node->ms_heap_size++;       /* j is where the "hole" is */
222         while (j > 0)
223         {
224                 int                     i = (j - 1) / 2;
225
226                 if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
227                         break;
228                 heap[j] = heap[i];
229                 j = i;
230         }
231         heap[j] = new_slot;
232 }
233
234 /*
235  * Delete the heap top (the slot in heap[0]), and sift up.
236  */
237 static void
238 heap_siftup_slot(MergeAppendState *node)
239 {
240         SlotNumber *heap = node->ms_heap;
241         HeapPosition i,
242                                 n;
243
244         if (--node->ms_heap_size <= 0)
245                 return;
246         n = node->ms_heap_size;         /* heap[n] needs to be reinserted */
247         i = 0;                                          /* i is where the "hole" is */
248         for (;;)
249         {
250                 int                     j = 2 * i + 1;
251
252                 if (j >= n)
253                         break;
254                 if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
255                         j++;
256                 if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
257                         break;
258                 heap[i] = heap[j];
259                 i = j;
260         }
261         heap[i] = heap[n];
262 }
263
264 /*
265  * Compare the tuples in the two given slots.
266  */
267 static int32
268 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
269 {
270         TupleTableSlot *s1 = node->ms_slots[slot1];
271         TupleTableSlot *s2 = node->ms_slots[slot2];
272         int                     nkey;
273
274         Assert(!TupIsNull(s1));
275         Assert(!TupIsNull(s2));
276
277         for (nkey = 0; nkey < node->ms_nkeys; nkey++)
278         {
279                 SortSupport     sortKey = node->ms_sortkeys + nkey;
280                 AttrNumber      attno = sortKey->ssup_attno;
281                 Datum           datum1,
282                                         datum2;
283                 bool            isNull1,
284                                         isNull2;
285                 int                     compare;
286
287                 datum1 = slot_getattr(s1, attno, &isNull1);
288                 datum2 = slot_getattr(s2, attno, &isNull2);
289
290                 compare = ApplySortComparator(datum1, isNull1,
291                                                                           datum2, isNull2,
292                                                                           sortKey);
293                 if (compare != 0)
294                         return compare;
295         }
296         return 0;
297 }
298
299 /* ----------------------------------------------------------------
300  *              ExecEndMergeAppend
301  *
302  *              Shuts down the subscans of the MergeAppend node.
303  *
304  *              Returns nothing of interest.
305  * ----------------------------------------------------------------
306  */
307 void
308 ExecEndMergeAppend(MergeAppendState *node)
309 {
310         PlanState **mergeplans;
311         int                     nplans;
312         int                     i;
313
314         /*
315          * get information from the node
316          */
317         mergeplans = node->mergeplans;
318         nplans = node->ms_nplans;
319
320         /*
321          * shut down each of the subscans
322          */
323         for (i = 0; i < nplans; i++)
324                 ExecEndNode(mergeplans[i]);
325 }
326
327 void
328 ExecReScanMergeAppend(MergeAppendState *node)
329 {
330         int                     i;
331
332         for (i = 0; i < node->ms_nplans; i++)
333         {
334                 PlanState  *subnode = node->mergeplans[i];
335
336                 /*
337                  * ExecReScan doesn't know about my subplans, so I have to do
338                  * changed-parameter signaling myself.
339                  */
340                 if (node->ps.chgParam != NULL)
341                         UpdateChangedParamSet(subnode, node->ps.chgParam);
342
343                 /*
344                  * If chgParam of subnode is not null then plan will be re-scanned by
345                  * first ExecProcNode.
346                  */
347                 if (subnode->chgParam == NULL)
348                         ExecReScan(subnode);
349         }
350         node->ms_heap_size = 0;
351         node->ms_initialized = false;
352         node->ms_last_slot = -1;
353 }