1 /*-------------------------------------------------------------------------
4 * routines to handle MergeAppend nodes.
6 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeMergeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
33 * MergeAppend---+------+------+--- nil
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
45 * It gets quite confusing having a heap array (indexed by integers) which
46 * contains integers which index into the slots array. These typedefs try to
47 * clear it up, but they're only documentation.
49 typedef int SlotNumber;
50 typedef int HeapPosition;
52 static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
53 static void heap_siftup_slot(MergeAppendState *node);
54 static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
57 /* ----------------------------------------------------------------
60 * Begin all of the subscans of the MergeAppend node.
61 * ----------------------------------------------------------------
64 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 MergeAppendState *mergestate = makeNode(MergeAppendState);
67 PlanState **mergeplanstates;
72 /* check for unsupported flags */
73 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 * Set up empty vector of subplan states
78 nplans = list_length(node->mergeplans);
80 mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
83 * create new MergeAppendState for our node
85 mergestate->ps.plan = (Plan *) node;
86 mergestate->ps.state = estate;
87 mergestate->mergeplans = mergeplanstates;
88 mergestate->ms_nplans = nplans;
90 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
91 mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
94 * Miscellaneous initialization
96 * MergeAppend plans don't have expression contexts because they never
97 * call ExecQual or ExecProject.
101 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
102 * so we have to initialize them.
104 ExecInitResultTupleSlot(estate, &mergestate->ps);
107 * call ExecInitNode on each of the plans to be executed and save the
108 * results into the array "mergeplans".
111 foreach(lc, node->mergeplans)
113 Plan *initNode = (Plan *) lfirst(lc);
115 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
120 * initialize output tuple type
122 ExecAssignResultTypeFromTL(&mergestate->ps);
123 mergestate->ps.ps_ProjInfo = NULL;
126 * initialize sort-key information
128 mergestate->ms_nkeys = node->numCols;
129 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
131 for (i = 0; i < node->numCols; i++)
133 SortSupport sortKey = mergestate->ms_sortkeys + i;
135 sortKey->ssup_cxt = CurrentMemoryContext;
136 sortKey->ssup_collation = node->collations[i];
137 sortKey->ssup_nulls_first = node->nullsFirst[i];
138 sortKey->ssup_attno = node->sortColIdx[i];
140 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
144 * initialize to show we have not run the subplans yet
146 mergestate->ms_heap_size = 0;
147 mergestate->ms_initialized = false;
148 mergestate->ms_last_slot = -1;
153 /* ----------------------------------------------------------------
156 * Handles iteration over multiple subplans.
157 * ----------------------------------------------------------------
160 ExecMergeAppend(MergeAppendState *node)
162 TupleTableSlot *result;
165 if (!node->ms_initialized)
168 * First time through: pull the first tuple from each subplan, and set
171 for (i = 0; i < node->ms_nplans; i++)
173 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
174 if (!TupIsNull(node->ms_slots[i]))
175 heap_insert_slot(node, i);
177 node->ms_initialized = true;
182 * Otherwise, pull the next tuple from whichever subplan we returned
183 * from last time, and insert it into the heap. (We could simplify
184 * the logic a bit by doing this before returning from the prior call,
185 * but it's better to not pull tuples until necessary.)
187 i = node->ms_last_slot;
188 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
189 if (!TupIsNull(node->ms_slots[i]))
190 heap_insert_slot(node, i);
193 if (node->ms_heap_size > 0)
195 /* Return the topmost heap node, and sift up the remaining nodes */
196 i = node->ms_heap[0];
197 result = node->ms_slots[i];
198 node->ms_last_slot = i;
199 heap_siftup_slot(node);
203 /* All the subplans are exhausted, and so is the heap */
204 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
211 * Insert a new slot into the heap. The slot must contain a valid tuple.
214 heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
216 SlotNumber *heap = node->ms_heap;
219 Assert(!TupIsNull(node->ms_slots[new_slot]));
221 j = node->ms_heap_size++; /* j is where the "hole" is */
226 if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
235 * Delete the heap top (the slot in heap[0]), and sift up.
238 heap_siftup_slot(MergeAppendState *node)
240 SlotNumber *heap = node->ms_heap;
244 if (--node->ms_heap_size <= 0)
246 n = node->ms_heap_size; /* heap[n] needs to be reinserted */
247 i = 0; /* i is where the "hole" is */
254 if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
256 if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
265 * Compare the tuples in the two given slots.
268 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
270 TupleTableSlot *s1 = node->ms_slots[slot1];
271 TupleTableSlot *s2 = node->ms_slots[slot2];
274 Assert(!TupIsNull(s1));
275 Assert(!TupIsNull(s2));
277 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
279 SortSupport sortKey = node->ms_sortkeys + nkey;
280 AttrNumber attno = sortKey->ssup_attno;
287 datum1 = slot_getattr(s1, attno, &isNull1);
288 datum2 = slot_getattr(s2, attno, &isNull2);
290 compare = ApplySortComparator(datum1, isNull1,
299 /* ----------------------------------------------------------------
302 * Shuts down the subscans of the MergeAppend node.
304 * Returns nothing of interest.
305 * ----------------------------------------------------------------
308 ExecEndMergeAppend(MergeAppendState *node)
310 PlanState **mergeplans;
315 * get information from the node
317 mergeplans = node->mergeplans;
318 nplans = node->ms_nplans;
321 * shut down each of the subscans
323 for (i = 0; i < nplans; i++)
324 ExecEndNode(mergeplans[i]);
328 ExecReScanMergeAppend(MergeAppendState *node)
332 for (i = 0; i < node->ms_nplans; i++)
334 PlanState *subnode = node->mergeplans[i];
337 * ExecReScan doesn't know about my subplans, so I have to do
338 * changed-parameter signaling myself.
340 if (node->ps.chgParam != NULL)
341 UpdateChangedParamSet(subnode, node->ps.chgParam);
344 * If chgParam of subnode is not null then plan will be re-scanned by
345 * first ExecProcNode.
347 if (subnode->chgParam == NULL)
350 node->ms_heap_size = 0;
351 node->ms_initialized = false;
352 node->ms_last_slot = -1;