]> granicus.if.org Git - postgresql/commitdiff
Basic binary heap implementation.
authorRobert Haas <rhaas@postgresql.org>
Thu, 29 Nov 2012 16:13:08 +0000 (11:13 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 29 Nov 2012 16:16:59 +0000 (11:16 -0500)
There are probably other places where this can be used, but for now,
this just makes MergeAppend use it, so that this code will have test
coverage.  There is other work in the queue that will use this, as
well.

Abhijit Menon-Sen, reviewed by Andres Freund, Robert Haas, Álvaro
Herrera, Tom Lane, and others.

src/backend/executor/nodeMergeAppend.c
src/backend/lib/Makefile
src/backend/lib/binaryheap.c [new file with mode: 0644]
src/include/lib/binaryheap.h [new file with mode: 0644]
src/include/nodes/execnodes.h

index d5141ba54e2433b82c0881e1e74f5900aaad085e..9dc25eefc425df609bb1d7129341285d6dd5d8f0 100644 (file)
 #include "executor/execdebug.h"
 #include "executor/nodeMergeAppend.h"
 
+#include "lib/binaryheap.h"
+
 /*
- * It gets quite confusing having a heap array (indexed by integers) which
- * contains integers which index into the slots array. These typedefs try to
- * clear it up, but they're only documentation.
+ * We have one slot for each item in the heap array.  We use SlotNumber
+ * to store slot indexes.  This doesn't actually provide any formal
+ * type-safety, but it makes the code more self-documenting.
  */
-typedef int SlotNumber;
-typedef int HeapPosition;
+typedef int32 SlotNumber;
 
-static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
-static void heap_siftup_slot(MergeAppendState *node);
-static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
+static int     heap_compare_slots(Datum a, Datum b, void *arg);
 
 
 /* ----------------------------------------------------------------
@@ -88,7 +87,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
        mergestate->ms_nplans = nplans;
 
        mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
-       mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
+       mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
+                                                                                         mergestate);
 
        /*
         * Miscellaneous initialization
@@ -143,9 +143,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
        /*
         * initialize to show we have not run the subplans yet
         */
-       mergestate->ms_heap_size = 0;
        mergestate->ms_initialized = false;
-       mergestate->ms_last_slot = -1;
 
        return mergestate;
 }
@@ -172,101 +170,53 @@ ExecMergeAppend(MergeAppendState *node)
                {
                        node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
                        if (!TupIsNull(node->ms_slots[i]))
-                               heap_insert_slot(node, i);
+                               binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
                }
+               binaryheap_build(node->ms_heap);
                node->ms_initialized = true;
        }
        else
        {
                /*
                 * Otherwise, pull the next tuple from whichever subplan we returned
-                * from last time, and insert it into the heap.  (We could simplify
-                * the logic a bit by doing this before returning from the prior call,
-                * but it's better to not pull tuples until necessary.)
+                * from last time, and reinsert the subplan index into the heap,
+                * because it might now compare differently against the existing
+                * elements of the heap.  (We could perhaps simplify the logic a bit
+                * by doing this before returning from the prior call, but it's better
+                * to not pull tuples until necessary.)
                 */
-               i = node->ms_last_slot;
+               i = DatumGetInt32(binaryheap_first(node->ms_heap));
                node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
                if (!TupIsNull(node->ms_slots[i]))
-                       heap_insert_slot(node, i);
+                       binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
+               else
+                       (void) binaryheap_remove_first(node->ms_heap);
        }
 
-       if (node->ms_heap_size > 0)
-       {
-               /* Return the topmost heap node, and sift up the remaining nodes */
-               i = node->ms_heap[0];
-               result = node->ms_slots[i];
-               node->ms_last_slot = i;
-               heap_siftup_slot(node);
-       }
-       else
+       if (binaryheap_empty(node->ms_heap))
        {
                /* All the subplans are exhausted, and so is the heap */
                result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
        }
-
-       return result;
-}
-
-/*
- * Insert a new slot into the heap.  The slot must contain a valid tuple.
- */
-static void
-heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
-{
-       SlotNumber *heap = node->ms_heap;
-       HeapPosition j;
-
-       Assert(!TupIsNull(node->ms_slots[new_slot]));
-
-       j = node->ms_heap_size++;       /* j is where the "hole" is */
-       while (j > 0)
+       else
        {
-               int                     i = (j - 1) / 2;
-
-               if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
-                       break;
-               heap[j] = heap[i];
-               j = i;
+               i = DatumGetInt32(binaryheap_first(node->ms_heap));
+               result = node->ms_slots[i];
        }
-       heap[j] = new_slot;
-}
 
-/*
- * Delete the heap top (the slot in heap[0]), and sift up.
- */
-static void
-heap_siftup_slot(MergeAppendState *node)
-{
-       SlotNumber *heap = node->ms_heap;
-       HeapPosition i,
-                               n;
-
-       if (--node->ms_heap_size <= 0)
-               return;
-       n = node->ms_heap_size;         /* heap[n] needs to be reinserted */
-       i = 0;                                          /* i is where the "hole" is */
-       for (;;)
-       {
-               int                     j = 2 * i + 1;
-
-               if (j >= n)
-                       break;
-               if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
-                       j++;
-               if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
-                       break;
-               heap[i] = heap[j];
-               i = j;
-       }
-       heap[i] = heap[n];
+       return result;
 }
 
 /*
  * Compare the tuples in the two given slots.
  */
 static int32
-heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
+heap_compare_slots(Datum a, Datum b, void *arg)
 {
+       MergeAppendState *node = (MergeAppendState *) arg;
+       SlotNumber      slot1 = DatumGetInt32(a);
+       SlotNumber      slot2 = DatumGetInt32(b);
+
        TupleTableSlot *s1 = node->ms_slots[slot1];
        TupleTableSlot *s2 = node->ms_slots[slot2];
        int                     nkey;
@@ -291,7 +241,7 @@ heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
                                                                          datum2, isNull2,
                                                                          sortKey);
                if (compare != 0)
-                       return compare;
+                       return -compare;
        }
        return 0;
 }
@@ -347,7 +297,5 @@ ExecReScanMergeAppend(MergeAppendState *node)
                if (subnode->chgParam == NULL)
                        ExecReScan(subnode);
        }
-       node->ms_heap_size = 0;
        node->ms_initialized = false;
-       node->ms_last_slot = -1;
 }
index 98ce3d7e4adf5822397436aa3e7810578c66dd2f..327a1bc16d83dc15baf1e296e0c43c0ec46e861e 100644 (file)
@@ -12,6 +12,6 @@ subdir = src/backend/lib
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = ilist.o stringinfo.o
+OBJS = ilist.o binaryheap.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c
new file mode 100644 (file)
index 0000000..73c80e4
--- /dev/null
@@ -0,0 +1,293 @@
+/*-------------------------------------------------------------------------
+ *
+ * binaryheap.c
+ *       A simple binary heap implementaion
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/lib/binaryheap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "lib/binaryheap.h"
+
+static void sift_down(binaryheap *heap, int node_off);
+static void sift_up(binaryheap *heap, int node_off);
+static inline void swap_nodes(binaryheap *heap, int a, int b);
+
+/*
+ * binaryheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap that has the capacity to
+ * store the given number of nodes, with the heap property defined by
+ * the given comparator function, which will be invoked with the additional
+ * argument specified by 'arg'.
+ */
+binaryheap *
+binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
+{
+       int                     sz;
+       binaryheap *heap;
+
+       sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity;
+       heap = palloc(sz);
+       heap->bh_size = 0;
+       heap->bh_space = capacity;
+       heap->bh_has_heap_property = true;
+       heap->bh_compare = compare;
+       heap->bh_arg = arg;
+
+       return heap;
+}
+
+/*
+ * binaryheap_free
+ *
+ * Releases memory used by the given binaryheap.
+ */
+void
+binaryheap_free(binaryheap *heap)
+{
+       pfree(heap);
+}
+
+/*
+ * These utility functions return the offset of the left child, right
+ * child, and parent of the node at the given index, respectively.
+ *
+ * The heap is represented as an array of nodes, with the root node
+ * stored at index 0. The left child of node i is at index 2*i+1, and
+ * the right child at 2*i+2. The parent of node i is at index (i-1)/2.
+ */
+
+static inline int
+left_offset(int i)
+{
+       return 2 * i + 1;
+}
+
+static inline int
+right_offset(int i)
+{
+       return 2 * i + 2;
+}
+
+static inline int
+parent_offset(int i)
+{
+       return (i - 1) / 2;
+}
+
+/*
+ * binaryheap_add_unordered
+ *
+ * Adds the given datum to the end of the heap's list of nodes in O(1) without
+ * preserving the heap property. This is a convenience to add elements quickly
+ * to a new heap. To obtain a valid heap, one must call binaryheap_build()
+ * afterwards.
+ */
+void
+binaryheap_add_unordered(binaryheap *heap, Datum d)
+{
+       if (heap->bh_size >= heap->bh_space)
+               elog(ERROR, "out of binary heap slots");
+       heap->bh_has_heap_property = false;
+       heap->bh_nodes[heap->bh_size] = d;
+       heap->bh_size++;
+}
+
+/*
+ * binaryheap_build
+ *
+ * Assembles a valid heap in O(n) from the nodes added by
+ * binaryheap_add_unordered(). Not needed otherwise.
+ */
+void
+binaryheap_build(binaryheap *heap)
+{
+       int                     i;
+
+       for (i = parent_offset(heap->bh_size - 1); i >= 0; i--)
+               sift_down(heap, i);
+       heap->bh_has_heap_property = true;
+}
+
+/*
+ * binaryheap_add
+ *
+ * Adds the given datum to the heap in O(log n) time, while preserving
+ * the heap property.
+ */
+void
+binaryheap_add(binaryheap *heap, Datum d)
+{
+       if (heap->bh_size >= heap->bh_space)
+               elog(ERROR, "out of binary heap slots");
+       heap->bh_nodes[heap->bh_size] = d;
+       heap->bh_size++;
+       sift_up(heap, heap->bh_size - 1);
+}
+
+/*
+ * binaryheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. The caller must ensure that this
+ * routine is not used on an empty heap. Always O(1).
+ */
+Datum
+binaryheap_first(binaryheap *heap)
+{
+       Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+       return heap->bh_nodes[0];
+}
+
+/*
+ * binaryheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. The caller must ensure
+ * that this routine is not used on an empty heap. O(log n) worst
+ * case.
+ */
+Datum
+binaryheap_remove_first(binaryheap *heap)
+{
+       Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+
+       if (heap->bh_size == 1)
+       {
+               heap->bh_size--;
+               return heap->bh_nodes[0];
+       }
+
+       /*
+        * Swap the root and last nodes, decrease the size of the heap (i.e.
+        * remove the former root node) and sift the new root node down to its
+        * correct position.
+        */
+       swap_nodes(heap, 0, heap->bh_size - 1);
+       heap->bh_size--;
+       sift_down(heap, 0);
+
+       return heap->bh_nodes[heap->bh_size];
+}
+
+/*
+ * binaryheap_replace_first
+ *
+ * Replace the topmost element of a non-empty heap, preserving the heap
+ * property.  O(1) in the best case, or O(log n) if it must fall back to
+ * sifting the new node down.
+ */
+void
+binaryheap_replace_first(binaryheap *heap, Datum d)
+{
+       Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+
+       heap->bh_nodes[0] = d;
+
+       if (heap->bh_size > 1)
+               sift_down(heap, 0);
+}
+
+/*
+ * Swap the contents of two nodes.
+ */
+static inline void
+swap_nodes(binaryheap *heap, int a, int b)
+{
+       Datum   swap;
+
+       swap = heap->bh_nodes[a];
+       heap->bh_nodes[a] = heap->bh_nodes[b];
+       heap->bh_nodes[b] = swap;
+}
+
+/*
+ * Sift a node up to the highest position it can hold according to the
+ * comparator.
+ */
+static void
+sift_up(binaryheap *heap, int node_off)
+{
+       while (node_off != 0)
+       {
+               int                     cmp;
+               int                     parent_off;
+
+               /*
+                * If this node is smaller than its parent, the heap condition is
+                * satisfied, and we're done.
+                */
+               parent_off = parent_offset(node_off);
+               cmp = heap->bh_compare(heap->bh_nodes[node_off],
+                                                          heap->bh_nodes[parent_off],
+                                                          heap->bh_arg);
+               if (cmp <= 0)
+                       break;
+
+               /*
+                * Otherwise, swap the node and its parent and go on to check the
+                * node's new parent.
+                */
+               swap_nodes(heap, node_off, parent_off);
+               node_off = parent_off;
+       }
+}
+
+/*
+ * Sift a node down from its current position to satisfy the heap
+ * property.
+ */
+static void
+sift_down(binaryheap *heap, int node_off)
+{
+       while (true)
+       {
+               int                     left_off = left_offset(node_off);
+               int                     right_off = right_offset(node_off);
+               int                     swap_off = 0;
+
+               /* Is the left child larger than the parent? */
+               if (left_off < heap->bh_size &&
+                       heap->bh_compare(heap->bh_nodes[node_off],
+                                                        heap->bh_nodes[left_off],
+                                                        heap->bh_arg) < 0)
+                       swap_off = left_off;
+
+               /* Is the right child larger than the parent? */
+               if (right_off < heap->bh_size &&
+                       heap->bh_compare(heap->bh_nodes[node_off],
+                                                        heap->bh_nodes[right_off],
+                                                        heap->bh_arg) < 0)
+               {
+                       /* swap with the larger child */
+                       if (!swap_off ||
+                               heap->bh_compare(heap->bh_nodes[left_off],
+                                                                heap->bh_nodes[right_off],
+                                                                heap->bh_arg) < 0)
+                               swap_off = right_off;
+               }
+
+               /*
+                * If we didn't find anything to swap, the heap condition is
+                * satisfied, and we're done.
+                */
+               if (!swap_off)
+                       break;
+
+               /*
+                * Otherwise, swap the node with the child that violates the heap
+                * property; then go on to check its children.
+                */
+               swap_nodes(heap, swap_off, node_off);
+               node_off = swap_off;
+       }
+}
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
new file mode 100644 (file)
index 0000000..449ceb5
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * binaryheap.h
+ *
+ * A simple binary heap implementation
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/lib/binaryheap.h
+ */
+
+#ifndef BINARYHEAP_H
+#define BINARYHEAP_H
+
+/*
+ * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b,
+ * and >0 iff a > b.  For a min-heap, the conditions are reversed.
+ */
+typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg);
+
+/*
+ * binaryheap
+ *
+ *             bh_size                 how many nodes are currently in "nodes"
+ *             bh_space                how many nodes can be stored in "nodes"
+ *             bh_has_heap_property    no unordered operations since last heap build
+ *             bh_compare              comparison function to define the heap property
+ *             bh_arg                  user data for comparison function
+ *             bh_nodes                variable-length array of "space" nodes
+ */
+typedef struct binaryheap
+{
+       int                     bh_size;
+       int                     bh_space;
+       bool            bh_has_heap_property;   /* debugging cross-check */
+       binaryheap_comparator bh_compare;
+       void       *bh_arg;
+       Datum           bh_nodes[FLEXIBLE_ARRAY_MEMBER];
+}      binaryheap;
+
+extern binaryheap *binaryheap_allocate(int capacity,
+                                                               binaryheap_comparator compare,
+                                                               void *arg);
+extern void binaryheap_free(binaryheap *heap);
+extern void binaryheap_add_unordered(binaryheap *heap, Datum d);
+extern void binaryheap_build(binaryheap *heap);
+extern void binaryheap_add(binaryheap *heap, Datum d);
+extern Datum binaryheap_first(binaryheap *heap);
+extern Datum binaryheap_remove_first(binaryheap *heap);
+extern void binaryheap_replace_first(binaryheap *heap, Datum d);
+
+#define binaryheap_empty(h)                    ((h)->bh_size == 0)
+
+#endif   /* BINARYHEAP_H */
index fec07b8e426e479e10511404bbd198bae7e26282..d4911bd2ae2d53c3af105807867493e8d4536541 100644 (file)
@@ -1100,10 +1100,8 @@ typedef struct AppendState
  *             nkeys                   number of sort key columns
  *             sortkeys                sort keys in SortSupport representation
  *             slots                   current output tuple of each subplan
- *             heap                    heap of active tuples (represented as array indexes)
- *             heap_size               number of active heap entries
+ *             heap                    heap of active tuples
  *             initialized             true if we have fetched first tuple from each subplan
- *             last_slot               last subplan fetched from (which must be re-called)
  * ----------------
  */
 typedef struct MergeAppendState
@@ -1114,10 +1112,8 @@ typedef struct MergeAppendState
        int                     ms_nkeys;
        SortSupport ms_sortkeys;        /* array of length ms_nkeys */
        TupleTableSlot **ms_slots;      /* array of length ms_nplans */
-       int                *ms_heap;            /* array of length ms_nplans */
-       int                     ms_heap_size;   /* current active length of ms_heap[] */
+       struct binaryheap *ms_heap; /* binary heap of slot indices */
        bool            ms_initialized; /* are subplans started? */
-       int                     ms_last_slot;   /* last subplan slot we returned from */
 } MergeAppendState;
 
 /* ----------------