1 /*-------------------------------------------------------------------------
4 * routines to handle MergeAppend nodes.
6 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeMergeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
33 * MergeAppend---+------+------+--- nil
41 #include "access/nbtree.h"
42 #include "executor/execdebug.h"
43 #include "executor/nodeMergeAppend.h"
44 #include "utils/lsyscache.h"
47 * It gets quite confusing having a heap array (indexed by integers) which
48 * contains integers which index into the slots array. These typedefs try to
49 * clear it up, but they're only documentation.
51 typedef int SlotNumber;
52 typedef int HeapPosition;
54 static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
55 static void heap_siftup_slot(MergeAppendState *node);
56 static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
59 /* ----------------------------------------------------------------
62 * Begin all of the subscans of the MergeAppend node.
63 * ----------------------------------------------------------------
66 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
68 MergeAppendState *mergestate = makeNode(MergeAppendState);
69 PlanState **mergeplanstates;
74 /* check for unsupported flags */
75 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
78 * Set up empty vector of subplan states
80 nplans = list_length(node->mergeplans);
82 mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
85 * create new MergeAppendState for our node
87 mergestate->ps.plan = (Plan *) node;
88 mergestate->ps.state = estate;
89 mergestate->mergeplans = mergeplanstates;
90 mergestate->ms_nplans = nplans;
92 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
93 mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
96 * Miscellaneous initialization
98 * MergeAppend plans don't have expression contexts because they never
99 * call ExecQual or ExecProject.
103 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
104 * so we have to initialize them.
106 ExecInitResultTupleSlot(estate, &mergestate->ps);
109 * call ExecInitNode on each of the plans to be executed and save the
110 * results into the array "mergeplans".
113 foreach(lc, node->mergeplans)
115 Plan *initNode = (Plan *) lfirst(lc);
117 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
122 * initialize output tuple type
124 ExecAssignResultTypeFromTL(&mergestate->ps);
125 mergestate->ps.ps_ProjInfo = NULL;
128 * initialize sort-key information
130 mergestate->ms_nkeys = node->numCols;
131 mergestate->ms_scankeys = palloc0(sizeof(ScanKeyData) * node->numCols);
133 for (i = 0; i < node->numCols; i++)
138 if (!get_compare_function_for_ordering_op(node->sortOperators[i],
139 &sortFunction, &reverse))
140 elog(ERROR, "operator %u is not a valid ordering operator",
141 node->sortOperators[i]);
144 * We needn't fill in sk_strategy or sk_subtype since these scankeys
145 * will never be passed to an index.
147 ScanKeyInit(&mergestate->ms_scankeys[i],
153 /* However, we use btree's conventions for encoding directionality */
155 mergestate->ms_scankeys[i].sk_flags |= SK_BT_DESC;
156 if (node->nullsFirst[i])
157 mergestate->ms_scankeys[i].sk_flags |= SK_BT_NULLS_FIRST;
161 * initialize to show we have not run the subplans yet
163 mergestate->ms_heap_size = 0;
164 mergestate->ms_initialized = false;
165 mergestate->ms_last_slot = -1;
170 /* ----------------------------------------------------------------
173 * Handles iteration over multiple subplans.
174 * ----------------------------------------------------------------
177 ExecMergeAppend(MergeAppendState *node)
179 TupleTableSlot *result;
182 if (!node->ms_initialized)
185 * First time through: pull the first tuple from each subplan,
186 * and set up the heap.
188 for (i = 0; i < node->ms_nplans; i++)
190 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
191 if (!TupIsNull(node->ms_slots[i]))
192 heap_insert_slot(node, i);
194 node->ms_initialized = true;
199 * Otherwise, pull the next tuple from whichever subplan we returned
200 * from last time, and insert it into the heap. (We could simplify
201 * the logic a bit by doing this before returning from the prior call,
202 * but it's better to not pull tuples until necessary.)
204 i = node->ms_last_slot;
205 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
206 if (!TupIsNull(node->ms_slots[i]))
207 heap_insert_slot(node, i);
210 if (node->ms_heap_size > 0)
212 /* Return the topmost heap node, and sift up the remaining nodes */
213 i = node->ms_heap[0];
214 result = node->ms_slots[i];
215 node->ms_last_slot = i;
216 heap_siftup_slot(node);
220 /* All the subplans are exhausted, and so is the heap */
221 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
228 * Insert a new slot into the heap. The slot must contain a valid tuple.
231 heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
233 SlotNumber *heap = node->ms_heap;
236 Assert(!TupIsNull(node->ms_slots[new_slot]));
238 j = node->ms_heap_size++; /* j is where the "hole" is */
243 if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
252 * Delete the heap top (the slot in heap[0]), and sift up.
255 heap_siftup_slot(MergeAppendState *node)
257 SlotNumber *heap = node->ms_heap;
261 if (--node->ms_heap_size <= 0)
263 n = node->ms_heap_size; /* heap[n] needs to be reinserted */
264 i = 0; /* i is where the "hole" is */
271 if (j+1 < n && heap_compare_slots(node, heap[j], heap[j+1]) > 0)
273 if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
282 * Compare the tuples in the two given slots.
285 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
287 TupleTableSlot *s1 = node->ms_slots[slot1];
288 TupleTableSlot *s2 = node->ms_slots[slot2];
291 Assert(!TupIsNull(s1));
292 Assert(!TupIsNull(s2));
294 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
296 ScanKey scankey = node->ms_scankeys + nkey;
297 AttrNumber attno = scankey->sk_attno;
304 datum1 = slot_getattr(s1, attno, &isNull1);
305 datum2 = slot_getattr(s2, attno, &isNull2);
310 continue; /* NULL "=" NULL */
311 else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
312 return -1; /* NULL "<" NOT_NULL */
314 return 1; /* NULL ">" NOT_NULL */
318 if (scankey->sk_flags & SK_BT_NULLS_FIRST)
319 return 1; /* NOT_NULL ">" NULL */
321 return -1; /* NOT_NULL "<" NULL */
325 compare = DatumGetInt32(FunctionCall2(&scankey->sk_func,
329 if (scankey->sk_flags & SK_BT_DESC)
338 /* ----------------------------------------------------------------
341 * Shuts down the subscans of the MergeAppend node.
343 * Returns nothing of interest.
344 * ----------------------------------------------------------------
347 ExecEndMergeAppend(MergeAppendState *node)
349 PlanState **mergeplans;
354 * get information from the node
356 mergeplans = node->mergeplans;
357 nplans = node->ms_nplans;
360 * shut down each of the subscans
362 for (i = 0; i < nplans; i++)
363 ExecEndNode(mergeplans[i]);
367 ExecReScanMergeAppend(MergeAppendState *node)
371 for (i = 0; i < node->ms_nplans; i++)
373 PlanState *subnode = node->mergeplans[i];
376 * ExecReScan doesn't know about my subplans, so I have to do
377 * changed-parameter signaling myself.
379 if (node->ps.chgParam != NULL)
380 UpdateChangedParamSet(subnode, node->ps.chgParam);
383 * If chgParam of subnode is not null then plan will be re-scanned by
384 * first ExecProcNode.
386 if (subnode->chgParam == NULL)
389 node->ms_heap_size = 0;
390 node->ms_initialized = false;
391 node->ms_last_slot = -1;