#include "executor/execdebug.h"
#include "executor/nodeMergeAppend.h"
+#include "lib/binaryheap.h"
+
/*
- * It gets quite confusing having a heap array (indexed by integers) which
- * contains integers which index into the slots array. These typedefs try to
- * clear it up, but they're only documentation.
+ * We have one slot for each item in the heap array. We use SlotNumber
+ * to store slot indexes. This doesn't actually provide any formal
+ * type-safety, but it makes the code more self-documenting.
*/
-typedef int SlotNumber;
-typedef int HeapPosition;
+typedef int32 SlotNumber;
-static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
-static void heap_siftup_slot(MergeAppendState *node);
-static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
+static int heap_compare_slots(Datum a, Datum b, void *arg);
/* ----------------------------------------------------------------
mergestate->ms_nplans = nplans;
mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
- mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
+ mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
+ mergestate);
/*
* Miscellaneous initialization
/*
* initialize to show we have not run the subplans yet
*/
- mergestate->ms_heap_size = 0;
mergestate->ms_initialized = false;
- mergestate->ms_last_slot = -1;
return mergestate;
}
{
node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
if (!TupIsNull(node->ms_slots[i]))
- heap_insert_slot(node, i);
+ binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
}
+ binaryheap_build(node->ms_heap);
node->ms_initialized = true;
}
else
{
/*
* Otherwise, pull the next tuple from whichever subplan we returned
- * from last time, and insert it into the heap. (We could simplify
- * the logic a bit by doing this before returning from the prior call,
- * but it's better to not pull tuples until necessary.)
+ * from last time, and reinsert the subplan index into the heap,
+ * because it might now compare differently against the existing
+ * elements of the heap. (We could perhaps simplify the logic a bit
+ * by doing this before returning from the prior call, but it's better
+ * to not pull tuples until necessary.)
*/
- i = node->ms_last_slot;
+ i = DatumGetInt32(binaryheap_first(node->ms_heap));
node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
if (!TupIsNull(node->ms_slots[i]))
- heap_insert_slot(node, i);
+ binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
+ else
+ (void) binaryheap_remove_first(node->ms_heap);
}
- if (node->ms_heap_size > 0)
- {
- /* Return the topmost heap node, and sift up the remaining nodes */
- i = node->ms_heap[0];
- result = node->ms_slots[i];
- node->ms_last_slot = i;
- heap_siftup_slot(node);
- }
- else
+ if (binaryheap_empty(node->ms_heap))
{
/* All the subplans are exhausted, and so is the heap */
result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
-
- return result;
-}
-
-/*
- * Insert a new slot into the heap. The slot must contain a valid tuple.
- */
-static void
-heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
-{
- SlotNumber *heap = node->ms_heap;
- HeapPosition j;
-
- Assert(!TupIsNull(node->ms_slots[new_slot]));
-
- j = node->ms_heap_size++; /* j is where the "hole" is */
- while (j > 0)
+ else
{
- int i = (j - 1) / 2;
-
- if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
- break;
- heap[j] = heap[i];
- j = i;
+ i = DatumGetInt32(binaryheap_first(node->ms_heap));
+ result = node->ms_slots[i];
}
- heap[j] = new_slot;
-}
-/*
- * Delete the heap top (the slot in heap[0]), and sift up.
- */
-static void
-heap_siftup_slot(MergeAppendState *node)
-{
- SlotNumber *heap = node->ms_heap;
- HeapPosition i,
- n;
-
- if (--node->ms_heap_size <= 0)
- return;
- n = node->ms_heap_size; /* heap[n] needs to be reinserted */
- i = 0; /* i is where the "hole" is */
- for (;;)
- {
- int j = 2 * i + 1;
-
- if (j >= n)
- break;
- if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
- j++;
- if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
- break;
- heap[i] = heap[j];
- i = j;
- }
- heap[i] = heap[n];
+ return result;
}
/*
* Compare the tuples in the two given slots.
*/
static int32
-heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
+heap_compare_slots(Datum a, Datum b, void *arg)
{
+ MergeAppendState *node = (MergeAppendState *) arg;
+ SlotNumber slot1 = DatumGetInt32(a);
+ SlotNumber slot2 = DatumGetInt32(b);
+
TupleTableSlot *s1 = node->ms_slots[slot1];
TupleTableSlot *s2 = node->ms_slots[slot2];
int nkey;
datum2, isNull2,
sortKey);
if (compare != 0)
- return compare;
+ return -compare;
}
return 0;
}
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
- node->ms_heap_size = 0;
node->ms_initialized = false;
- node->ms_last_slot = -1;
}
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = ilist.o stringinfo.o
+OBJS = ilist.o binaryheap.o stringinfo.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * binaryheap.c
+ * A simple binary heap implementaion
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/lib/binaryheap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "lib/binaryheap.h"
+
+static void sift_down(binaryheap *heap, int node_off);
+static void sift_up(binaryheap *heap, int node_off);
+static inline void swap_nodes(binaryheap *heap, int a, int b);
+
+/*
+ * binaryheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap that has the capacity to
+ * store the given number of nodes, with the heap property defined by
+ * the given comparator function, which will be invoked with the additional
+ * argument specified by 'arg'.
+ */
+binaryheap *
+binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
+{
+ int sz;
+ binaryheap *heap;
+
+ sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity;
+ heap = palloc(sz);
+ heap->bh_size = 0;
+ heap->bh_space = capacity;
+ heap->bh_has_heap_property = true;
+ heap->bh_compare = compare;
+ heap->bh_arg = arg;
+
+ return heap;
+}
+
+/*
+ * binaryheap_free
+ *
+ * Releases memory used by the given binaryheap.
+ */
+void
+binaryheap_free(binaryheap *heap)
+{
+ pfree(heap);
+}
+
+/*
+ * These utility functions return the offset of the left child, right
+ * child, and parent of the node at the given index, respectively.
+ *
+ * The heap is represented as an array of nodes, with the root node
+ * stored at index 0. The left child of node i is at index 2*i+1, and
+ * the right child at 2*i+2. The parent of node i is at index (i-1)/2.
+ */
+
+static inline int
+left_offset(int i)
+{
+ return 2 * i + 1;
+}
+
+static inline int
+right_offset(int i)
+{
+ return 2 * i + 2;
+}
+
+static inline int
+parent_offset(int i)
+{
+ return (i - 1) / 2;
+}
+
+/*
+ * binaryheap_add_unordered
+ *
+ * Adds the given datum to the end of the heap's list of nodes in O(1) without
+ * preserving the heap property. This is a convenience to add elements quickly
+ * to a new heap. To obtain a valid heap, one must call binaryheap_build()
+ * afterwards.
+ */
+void
+binaryheap_add_unordered(binaryheap *heap, Datum d)
+{
+ if (heap->bh_size >= heap->bh_space)
+ elog(ERROR, "out of binary heap slots");
+ heap->bh_has_heap_property = false;
+ heap->bh_nodes[heap->bh_size] = d;
+ heap->bh_size++;
+}
+
+/*
+ * binaryheap_build
+ *
+ * Assembles a valid heap in O(n) from the nodes added by
+ * binaryheap_add_unordered(). Not needed otherwise.
+ */
+void
+binaryheap_build(binaryheap *heap)
+{
+ int i;
+
+ for (i = parent_offset(heap->bh_size - 1); i >= 0; i--)
+ sift_down(heap, i);
+ heap->bh_has_heap_property = true;
+}
+
+/*
+ * binaryheap_add
+ *
+ * Adds the given datum to the heap in O(log n) time, while preserving
+ * the heap property.
+ */
+void
+binaryheap_add(binaryheap *heap, Datum d)
+{
+ if (heap->bh_size >= heap->bh_space)
+ elog(ERROR, "out of binary heap slots");
+ heap->bh_nodes[heap->bh_size] = d;
+ heap->bh_size++;
+ sift_up(heap, heap->bh_size - 1);
+}
+
+/*
+ * binaryheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. The caller must ensure that this
+ * routine is not used on an empty heap. Always O(1).
+ */
+Datum
+binaryheap_first(binaryheap *heap)
+{
+ Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+ return heap->bh_nodes[0];
+}
+
+/*
+ * binaryheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. The caller must ensure
+ * that this routine is not used on an empty heap. O(log n) worst
+ * case.
+ */
+Datum
+binaryheap_remove_first(binaryheap *heap)
+{
+ Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+
+ if (heap->bh_size == 1)
+ {
+ heap->bh_size--;
+ return heap->bh_nodes[0];
+ }
+
+ /*
+ * Swap the root and last nodes, decrease the size of the heap (i.e.
+ * remove the former root node) and sift the new root node down to its
+ * correct position.
+ */
+ swap_nodes(heap, 0, heap->bh_size - 1);
+ heap->bh_size--;
+ sift_down(heap, 0);
+
+ return heap->bh_nodes[heap->bh_size];
+}
+
+/*
+ * binaryheap_replace_first
+ *
+ * Replace the topmost element of a non-empty heap, preserving the heap
+ * property. O(1) in the best case, or O(log n) if it must fall back to
+ * sifting the new node down.
+ */
+void
+binaryheap_replace_first(binaryheap *heap, Datum d)
+{
+ Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+
+ heap->bh_nodes[0] = d;
+
+ if (heap->bh_size > 1)
+ sift_down(heap, 0);
+}
+
+/*
+ * Swap the contents of two nodes.
+ */
+static inline void
+swap_nodes(binaryheap *heap, int a, int b)
+{
+ Datum swap;
+
+ swap = heap->bh_nodes[a];
+ heap->bh_nodes[a] = heap->bh_nodes[b];
+ heap->bh_nodes[b] = swap;
+}
+
+/*
+ * Sift a node up to the highest position it can hold according to the
+ * comparator.
+ */
+static void
+sift_up(binaryheap *heap, int node_off)
+{
+ while (node_off != 0)
+ {
+ int cmp;
+ int parent_off;
+
+ /*
+ * If this node is smaller than its parent, the heap condition is
+ * satisfied, and we're done.
+ */
+ parent_off = parent_offset(node_off);
+ cmp = heap->bh_compare(heap->bh_nodes[node_off],
+ heap->bh_nodes[parent_off],
+ heap->bh_arg);
+ if (cmp <= 0)
+ break;
+
+ /*
+ * Otherwise, swap the node and its parent and go on to check the
+ * node's new parent.
+ */
+ swap_nodes(heap, node_off, parent_off);
+ node_off = parent_off;
+ }
+}
+
+/*
+ * Sift a node down from its current position to satisfy the heap
+ * property.
+ */
+static void
+sift_down(binaryheap *heap, int node_off)
+{
+ while (true)
+ {
+ int left_off = left_offset(node_off);
+ int right_off = right_offset(node_off);
+ int swap_off = 0;
+
+ /* Is the left child larger than the parent? */
+ if (left_off < heap->bh_size &&
+ heap->bh_compare(heap->bh_nodes[node_off],
+ heap->bh_nodes[left_off],
+ heap->bh_arg) < 0)
+ swap_off = left_off;
+
+ /* Is the right child larger than the parent? */
+ if (right_off < heap->bh_size &&
+ heap->bh_compare(heap->bh_nodes[node_off],
+ heap->bh_nodes[right_off],
+ heap->bh_arg) < 0)
+ {
+ /* swap with the larger child */
+ if (!swap_off ||
+ heap->bh_compare(heap->bh_nodes[left_off],
+ heap->bh_nodes[right_off],
+ heap->bh_arg) < 0)
+ swap_off = right_off;
+ }
+
+ /*
+ * If we didn't find anything to swap, the heap condition is
+ * satisfied, and we're done.
+ */
+ if (!swap_off)
+ break;
+
+ /*
+ * Otherwise, swap the node with the child that violates the heap
+ * property; then go on to check its children.
+ */
+ swap_nodes(heap, swap_off, node_off);
+ node_off = swap_off;
+ }
+}
--- /dev/null
+/*
+ * binaryheap.h
+ *
+ * A simple binary heap implementation
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/lib/binaryheap.h
+ */
+
+#ifndef BINARYHEAP_H
+#define BINARYHEAP_H
+
+/*
+ * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b,
+ * and >0 iff a > b. For a min-heap, the conditions are reversed.
+ */
+typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg);
+
+/*
+ * binaryheap
+ *
+ * bh_size how many nodes are currently in "nodes"
+ * bh_space how many nodes can be stored in "nodes"
+ * bh_has_heap_property no unordered operations since last heap build
+ * bh_compare comparison function to define the heap property
+ * bh_arg user data for comparison function
+ * bh_nodes variable-length array of "space" nodes
+ */
+typedef struct binaryheap
+{
+ int bh_size;
+ int bh_space;
+ bool bh_has_heap_property; /* debugging cross-check */
+ binaryheap_comparator bh_compare;
+ void *bh_arg;
+ Datum bh_nodes[FLEXIBLE_ARRAY_MEMBER];
+} binaryheap;
+
+extern binaryheap *binaryheap_allocate(int capacity,
+ binaryheap_comparator compare,
+ void *arg);
+extern void binaryheap_free(binaryheap *heap);
+extern void binaryheap_add_unordered(binaryheap *heap, Datum d);
+extern void binaryheap_build(binaryheap *heap);
+extern void binaryheap_add(binaryheap *heap, Datum d);
+extern Datum binaryheap_first(binaryheap *heap);
+extern Datum binaryheap_remove_first(binaryheap *heap);
+extern void binaryheap_replace_first(binaryheap *heap, Datum d);
+
+#define binaryheap_empty(h) ((h)->bh_size == 0)
+
+#endif /* BINARYHEAP_H */
* nkeys number of sort key columns
* sortkeys sort keys in SortSupport representation
* slots current output tuple of each subplan
- * heap heap of active tuples (represented as array indexes)
- * heap_size number of active heap entries
+ * heap heap of active tuples
* initialized true if we have fetched first tuple from each subplan
- * last_slot last subplan fetched from (which must be re-called)
* ----------------
*/
typedef struct MergeAppendState
int ms_nkeys;
SortSupport ms_sortkeys; /* array of length ms_nkeys */
TupleTableSlot **ms_slots; /* array of length ms_nplans */
- int *ms_heap; /* array of length ms_nplans */
- int ms_heap_size; /* current active length of ms_heap[] */
+ struct binaryheap *ms_heap; /* binary heap of slot indices */
bool ms_initialized; /* are subplans started? */
- int ms_last_slot; /* last subplan slot we returned from */
} MergeAppendState;
/* ----------------