1 /*-------------------------------------------------------------------------
4 * routines to handle MergeAppend nodes.
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/executor/nodeMergeAppend.c
13 *-------------------------------------------------------------------------
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
33 * MergeAppend---+------+------+--- nil
41 #include "executor/execdebug.h"
42 #include "executor/nodeMergeAppend.h"
44 #include "lib/binaryheap.h"
47 * We have one slot for each item in the heap array. We use SlotNumber
48 * to store slot indexes. This doesn't actually provide any formal
49 * type-safety, but it makes the code more self-documenting.
51 typedef int32 SlotNumber;
53 static int heap_compare_slots(Datum a, Datum b, void *arg);
56 /* ----------------------------------------------------------------
59 * Begin all of the subscans of the MergeAppend node.
60 * ----------------------------------------------------------------
63 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
65 MergeAppendState *mergestate = makeNode(MergeAppendState);
66 PlanState **mergeplanstates;
71 /* check for unsupported flags */
72 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
75 * Set up empty vector of subplan states
77 nplans = list_length(node->mergeplans);
79 mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
82 * create new MergeAppendState for our node
84 mergestate->ps.plan = (Plan *) node;
85 mergestate->ps.state = estate;
86 mergestate->mergeplans = mergeplanstates;
87 mergestate->ms_nplans = nplans;
89 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
90 mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
94 * Miscellaneous initialization
96 * MergeAppend plans don't have expression contexts because they never
97 * call ExecQual or ExecProject.
101 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
102 * so we have to initialize them.
104 ExecInitResultTupleSlot(estate, &mergestate->ps);
107 * call ExecInitNode on each of the plans to be executed and save the
108 * results into the array "mergeplans".
111 foreach(lc, node->mergeplans)
113 Plan *initNode = (Plan *) lfirst(lc);
115 mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
120 * initialize output tuple type
122 ExecAssignResultTypeFromTL(&mergestate->ps);
123 mergestate->ps.ps_ProjInfo = NULL;
126 * initialize sort-key information
128 mergestate->ms_nkeys = node->numCols;
129 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
131 for (i = 0; i < node->numCols; i++)
133 SortSupport sortKey = mergestate->ms_sortkeys + i;
135 sortKey->ssup_cxt = CurrentMemoryContext;
136 sortKey->ssup_collation = node->collations[i];
137 sortKey->ssup_nulls_first = node->nullsFirst[i];
138 sortKey->ssup_attno = node->sortColIdx[i];
141 * It isn't feasible to perform abbreviated key conversion, since
142 * tuples are pulled into mergestate's binary heap as needed. It
143 * would likely be counter-productive to convert tuples into an
144 * abbreviated representation as they're pulled up, so opt out of that
145 * additional optimization entirely.
147 sortKey->abbreviate = false;
149 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
153 * initialize to show we have not run the subplans yet
155 mergestate->ms_initialized = false;
160 /* ----------------------------------------------------------------
163 * Handles iteration over multiple subplans.
164 * ----------------------------------------------------------------
167 ExecMergeAppend(MergeAppendState *node)
169 TupleTableSlot *result;
172 if (!node->ms_initialized)
175 * First time through: pull the first tuple from each subplan, and set
178 for (i = 0; i < node->ms_nplans; i++)
180 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
181 if (!TupIsNull(node->ms_slots[i]))
182 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
184 binaryheap_build(node->ms_heap);
185 node->ms_initialized = true;
190 * Otherwise, pull the next tuple from whichever subplan we returned
191 * from last time, and reinsert the subplan index into the heap,
192 * because it might now compare differently against the existing
193 * elements of the heap. (We could perhaps simplify the logic a bit
194 * by doing this before returning from the prior call, but it's better
195 * to not pull tuples until necessary.)
197 i = DatumGetInt32(binaryheap_first(node->ms_heap));
198 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
199 if (!TupIsNull(node->ms_slots[i]))
200 binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
202 (void) binaryheap_remove_first(node->ms_heap);
205 if (binaryheap_empty(node->ms_heap))
207 /* All the subplans are exhausted, and so is the heap */
208 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
212 i = DatumGetInt32(binaryheap_first(node->ms_heap));
213 result = node->ms_slots[i];
220 * Compare the tuples in the two given slots.
223 heap_compare_slots(Datum a, Datum b, void *arg)
225 MergeAppendState *node = (MergeAppendState *) arg;
226 SlotNumber slot1 = DatumGetInt32(a);
227 SlotNumber slot2 = DatumGetInt32(b);
229 TupleTableSlot *s1 = node->ms_slots[slot1];
230 TupleTableSlot *s2 = node->ms_slots[slot2];
233 Assert(!TupIsNull(s1));
234 Assert(!TupIsNull(s2));
236 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
238 SortSupport sortKey = node->ms_sortkeys + nkey;
239 AttrNumber attno = sortKey->ssup_attno;
246 datum1 = slot_getattr(s1, attno, &isNull1);
247 datum2 = slot_getattr(s2, attno, &isNull2);
249 compare = ApplySortComparator(datum1, isNull1,
258 /* ----------------------------------------------------------------
261 * Shuts down the subscans of the MergeAppend node.
263 * Returns nothing of interest.
264 * ----------------------------------------------------------------
267 ExecEndMergeAppend(MergeAppendState *node)
269 PlanState **mergeplans;
274 * get information from the node
276 mergeplans = node->mergeplans;
277 nplans = node->ms_nplans;
280 * shut down each of the subscans
282 for (i = 0; i < nplans; i++)
283 ExecEndNode(mergeplans[i]);
287 ExecReScanMergeAppend(MergeAppendState *node)
291 for (i = 0; i < node->ms_nplans; i++)
293 PlanState *subnode = node->mergeplans[i];
296 * ExecReScan doesn't know about my subplans, so I have to do
297 * changed-parameter signaling myself.
299 if (node->ps.chgParam != NULL)
300 UpdateChangedParamSet(subnode, node->ps.chgParam);
303 * If chgParam of subnode is not null then plan will be re-scanned by
304 * first ExecProcNode.
306 if (subnode->chgParam == NULL)
309 binaryheap_reset(node->ms_heap);
310 node->ms_initialized = false;