]> granicus.if.org Git - postgresql/commitdiff
Replace not-very-bright implementation of topological sort with a better
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 6 Dec 2003 22:55:11 +0000 (22:55 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 6 Dec 2003 22:55:11 +0000 (22:55 +0000)
one (use a priority heap to keep track of items ready to output, instead
of searching the input array each time).  This brings the runtime of
pg_dump back to about what it was in 7.4.

src/bin/pg_dump/pg_dump_sort.c

index 12be66dadf7dce57432e6e221ec7db1af883c390..7db36b0d2569e3599cf4841d8085d5bf85ed9245 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.1 2003/12/06 03:00:16 tgl Exp $
+ *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.2 2003/12/06 22:55:11 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -52,6 +52,8 @@ static bool TopoSort(DumpableObject **objs,
                                         int numObjs,
                                         DumpableObject **ordering,
                                         int *nOrdering);
+static void addHeapElement(int val, int *heap, int heapLength);
+static int     removeHeapElement(int *heap, int heapLength);
 static bool findLoop(DumpableObject *obj,
                                         int depth,
                                         DumpableObject **ordering,
@@ -122,14 +124,13 @@ sortDumpableObjects(DumpableObject **objs, int numObjs)
  * partial ordering.)  Minimize rearrangement of the list not needed to
  * achieve the partial ordering.
  *
- * This is a lot simpler and slower than, for example, the topological sort
- * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
- * try to minimize the damage to the existing order.
+ * The input is the list of numObjs objects in objs[].  This list is not
+ * modified.
  *
  * Returns TRUE if able to build an ordering that satisfies all the
  * constraints, FALSE if not (there are contradictory constraints).
  *
- * On success (TRUE result), ordering[] is filled with an array of
+ * On success (TRUE result), ordering[] is filled with a sorted array of
  * DumpableObject pointers, of length equal to the input list length.
  *
  * On failure (FALSE result), ordering[] is filled with an array of
@@ -146,36 +147,60 @@ TopoSort(DumpableObject **objs,
                 int *nOrdering)                                /* output argument */
 {
        DumpId          maxDumpId = getMaxDumpId();
-       bool            result = true;
-       DumpableObject  **topoItems;
-       DumpableObject   *obj;
+       int                *pendingHeap;
        int                *beforeConstraints;
+       int                *idMap;
+       DumpableObject   *obj;
+       int                     heapLength;
        int                     i,
                                j,
-                               k,
-                               last;
+                               k;
 
-       /* First, create work array with the dump items in their current order */
-       topoItems = (DumpableObject **) malloc(numObjs * sizeof(DumpableObject *));
-       if (topoItems == NULL)
-               exit_horribly(NULL, modulename, "out of memory\n");
-       memcpy(topoItems, objs, numObjs * sizeof(DumpableObject *));
+       /*
+        * This is basically the same algorithm shown for topological sorting in
+        * Knuth's Volume 1.  However, we would like to minimize unnecessary
+        * rearrangement of the input ordering; that is, when we have a choice
+        * of which item to output next, we always want to take the one highest
+        * in the original list.  Therefore, instead of maintaining an unordered
+        * linked list of items-ready-to-output as Knuth does, we maintain a heap
+        * of their item numbers, which we can use as a priority queue.  This
+        * turns the algorithm from O(N) to O(N log N) because each insertion or
+        * removal of a heap item takes O(log N) time.  However, that's still
+        * plenty fast enough for this application.
+        */
 
        *nOrdering = numObjs;   /* for success return */
 
+       /* Eliminate the null case */
+       if (numObjs <= 0)
+               return true;
+
+       /* Create workspace for the above-described heap */
+       pendingHeap = (int *) malloc(numObjs * sizeof(int));
+       if (pendingHeap == NULL)
+               exit_horribly(NULL, modulename, "out of memory\n");
+
        /*
-        * Scan the constraints, and for each item in the array, generate a
+        * Scan the constraints, and for each item in the input, generate a
         * count of the number of constraints that say it must be before
         * something else. The count for the item with dumpId j is
-        * stored in beforeConstraints[j].
+        * stored in beforeConstraints[j].  We also make a map showing the
+        * input-order index of the item with dumpId j.
         */
        beforeConstraints = (int *) malloc((maxDumpId + 1) * sizeof(int));
        if (beforeConstraints == NULL)
                exit_horribly(NULL, modulename, "out of memory\n");
        memset(beforeConstraints, 0, (maxDumpId + 1) * sizeof(int));
+       idMap = (int *) malloc((maxDumpId + 1) * sizeof(int));
+       if (idMap == NULL)
+               exit_horribly(NULL, modulename, "out of memory\n");
        for (i = 0; i < numObjs; i++)
        {
-               obj = topoItems[i];
+               obj = objs[i];
+               j = obj->dumpId;
+               if (j <= 0 || j > maxDumpId)
+                       exit_horribly(NULL, modulename, "invalid dumpId %d\n", j);
+               idMap[j] = i;
                for (j = 0; j < obj->nDeps; j++)
                {
                        k = obj->dependencies[j];
@@ -185,63 +210,153 @@ TopoSort(DumpableObject **objs,
                }
        }
 
+       /*
+        * Now initialize the heap of items-ready-to-output by filling it with
+        * the indexes of items that already have beforeConstraints[id] == 0.
+        *
+        * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each
+        * j in the range 1..heapLength-1 (note we are using 0-based subscripts
+        * here, while the discussion in Knuth assumes 1-based subscripts).
+        * So, if we simply enter the indexes into pendingHeap[] in decreasing
+        * order, we a-fortiori have the heap invariant satisfied at completion
+        * of this loop, and don't need to do any sift-up comparisons.
+        */
+       heapLength = 0;
+       for (i = numObjs; --i >= 0; )
+       {
+               if (beforeConstraints[objs[i]->dumpId] == 0)
+                       pendingHeap[heapLength++] = i;
+       }
+
        /*--------------------
-        * Now scan the topoItems array backwards.      At each step, output the
-        * last item that has no remaining before-constraints, and decrease
-        * the beforeConstraints count of each of the items it was constrained
-        * against.
-        * i = index of ordering[] entry we want to output this time
-        * j = search index for topoItems[]
+        * Now emit objects, working backwards in the output list.  At each step,
+        * we use the priority heap to select the last item that has no remaining
+        * before-constraints.  We remove that item from the heap, output it to
+        * ordering[], and decrease the beforeConstraints count of each of the
+        * items it was constrained against.  Whenever an item's beforeConstraints
+        * count is thereby decreased to zero, we insert it into the priority heap
+        * to show that it is a candidate to output.  We are done when the heap
+        * becomes empty; if we have output every element then we succeeded,
+        * otherwise we failed.
+        * i = number of ordering[] entries left to output
+        * j = objs[] index of item we are outputting
         * k = temp for scanning constraint list for item j
-        * last = last non-null index in topoItems (avoid redundant searches)
         *--------------------
         */
-       last = numObjs - 1;
-       for (i = numObjs; --i >= 0;)
+       i = numObjs;
+       while (heapLength > 0)
        {
-               /* Find next candidate to output */
-               while (topoItems[last] == NULL)
-                       last--;
-               for (j = last; j >= 0; j--)
-               {
-                       obj = topoItems[j];
-                       if (obj != NULL && beforeConstraints[obj->dumpId] == 0)
-                               break;
-               }
-               /* If no available candidate, topological sort fails */
-               if (j < 0)
-               {
-                       result = false;
-                       break;
-               }
-               /* Output candidate, and mark it done by zeroing topoItems[] entry */
-               ordering[i] = obj = topoItems[j];
-               topoItems[j] = NULL;
+               /* Select object to output by removing largest heap member */
+               j = removeHeapElement(pendingHeap, heapLength--);
+               obj = objs[j];
+               /* Output candidate to ordering[] */
+               ordering[--i] = obj;
                /* Update beforeConstraints counts of its predecessors */
                for (k = 0; k < obj->nDeps; k++)
-                       beforeConstraints[obj->dependencies[k]]--;
+               {
+                       int             id = obj->dependencies[k];
+
+                       if ((--beforeConstraints[id]) == 0)
+                               addHeapElement(idMap[id], pendingHeap, heapLength++);
+               }
        }
 
        /*
-        * If we failed, report one of the circular constraint sets
+        * If we failed, report one of the circular constraint sets.  We do
+        * this by scanning beforeConstraints[] to locate the items that have
+        * not yet been output, and for each one, trying to trace a constraint
+        * loop leading back to it.  (There may be items that depend on items
+        * involved in a loop, but aren't themselves part of the loop, so not
+        * every nonzero beforeConstraints entry is necessarily a useful
+        * starting point.  We keep trying till we find a loop.)
         */
-       if (!result)
+       if (i != 0)
        {
-               for (j = last; j >= 0; j--)
+               for (j = 1; j <= maxDumpId; j++)
                {
-                       ordering[0] = obj = topoItems[j];
-                       if (obj && findLoop(obj, 1, ordering, nOrdering))
-                               break;
+                       if (beforeConstraints[j] != 0)
+                       {
+                               ordering[0] = obj = objs[idMap[j]];
+                               if (findLoop(obj, 1, ordering, nOrdering))
+                                       break;
+                       }
                }
-               if (j < 0)
+               if (j > maxDumpId)
                        exit_horribly(NULL, modulename,
                                                  "could not find dependency loop\n");
        }
 
        /* Done */
-       free(topoItems);
+       free(pendingHeap);
        free(beforeConstraints);
+       free(idMap);
+
+       return (i == 0);
+}
+
+/*
+ * Add an item to a heap (priority queue)
+ *
+ * heapLength is the current heap size; caller is responsible for increasing
+ * its value after the call.  There must be sufficient storage at *heap.
+ */
+static void
+addHeapElement(int val, int *heap, int heapLength)
+{
+       int                     j;
 
+       /*
+        * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
+        * is using 1-based array indexes, not 0-based.
+        */
+       j = heapLength;
+       while (j > 0)
+       {
+               int                     i = (j - 1) >> 1;
+
+               if (val <= heap[i])
+                       break;
+               heap[j] = heap[i];
+               j = i;
+       }
+       heap[j] = val;
+}
+
+/*
+ * Remove the largest item present in a heap (priority queue)
+ *
+ * heapLength is the current heap size; caller is responsible for decreasing
+ * its value after the call.
+ *
+ * We remove and return heap[0], which is always the largest element of
+ * the heap, and then "sift up" to maintain the heap invariant.
+ */
+static int
+removeHeapElement(int *heap, int heapLength)
+{
+       int                     result = heap[0];
+       int                     val;
+       int                     i;
+
+       if (--heapLength <= 0)
+               return result;
+       val = heap[heapLength];         /* value that must be reinserted */
+       i = 0;                                          /* i is where the "hole" is */
+       for (;;)
+       {
+               int                     j = 2 * i + 1;
+
+               if (j >= heapLength)
+                       break;
+               if (j + 1 < heapLength &&
+                       heap[j] < heap[j + 1])
+                       j++;
+               if (val >= heap[j])
+                       break;
+               heap[i] = heap[j];
+               i = j;
+       }
+       heap[i] = val;
        return result;
 }