From: Tom Lane Date: Sat, 6 Dec 2003 22:55:11 +0000 (+0000) Subject: Replace not-very-bright implementation of topological sort with a better X-Git-Tag: REL8_0_0BETA1~1567 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=79273cc7d2966240f83ae87fd1a98d96f7746592;p=postgresql Replace not-very-bright implementation of topological sort with a better one (use a priority heap to keep track of items ready to output, instead of searching the input array each time). This brings the runtime of pg_dump back to about what it was in 7.4. --- diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 12be66dadf..7db36b0d25 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.1 2003/12/06 03:00:16 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.2 2003/12/06 22:55:11 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -52,6 +52,8 @@ static bool TopoSort(DumpableObject **objs, int numObjs, DumpableObject **ordering, int *nOrdering); +static void addHeapElement(int val, int *heap, int heapLength); +static int removeHeapElement(int *heap, int heapLength); static bool findLoop(DumpableObject *obj, int depth, DumpableObject **ordering, @@ -122,14 +124,13 @@ sortDumpableObjects(DumpableObject **objs, int numObjs) * partial ordering.) Minimize rearrangement of the list not needed to * achieve the partial ordering. * - * This is a lot simpler and slower than, for example, the topological sort - * algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't - * try to minimize the damage to the existing order. + * The input is the list of numObjs objects in objs[]. This list is not + * modified. * * Returns TRUE if able to build an ordering that satisfies all the * constraints, FALSE if not (there are contradictory constraints). * - * On success (TRUE result), ordering[] is filled with an array of + * On success (TRUE result), ordering[] is filled with a sorted array of * DumpableObject pointers, of length equal to the input list length. * * On failure (FALSE result), ordering[] is filled with an array of @@ -146,36 +147,60 @@ TopoSort(DumpableObject **objs, int *nOrdering) /* output argument */ { DumpId maxDumpId = getMaxDumpId(); - bool result = true; - DumpableObject **topoItems; - DumpableObject *obj; + int *pendingHeap; int *beforeConstraints; + int *idMap; + DumpableObject *obj; + int heapLength; int i, j, - k, - last; + k; - /* First, create work array with the dump items in their current order */ - topoItems = (DumpableObject **) malloc(numObjs * sizeof(DumpableObject *)); - if (topoItems == NULL) - exit_horribly(NULL, modulename, "out of memory\n"); - memcpy(topoItems, objs, numObjs * sizeof(DumpableObject *)); + /* + * This is basically the same algorithm shown for topological sorting in + * Knuth's Volume 1. However, we would like to minimize unnecessary + * rearrangement of the input ordering; that is, when we have a choice + * of which item to output next, we always want to take the one highest + * in the original list. Therefore, instead of maintaining an unordered + * linked list of items-ready-to-output as Knuth does, we maintain a heap + * of their item numbers, which we can use as a priority queue. This + * turns the algorithm from O(N) to O(N log N) because each insertion or + * removal of a heap item takes O(log N) time. However, that's still + * plenty fast enough for this application. + */ *nOrdering = numObjs; /* for success return */ + /* Eliminate the null case */ + if (numObjs <= 0) + return true; + + /* Create workspace for the above-described heap */ + pendingHeap = (int *) malloc(numObjs * sizeof(int)); + if (pendingHeap == NULL) + exit_horribly(NULL, modulename, "out of memory\n"); + /* - * Scan the constraints, and for each item in the array, generate a + * Scan the constraints, and for each item in the input, generate a * count of the number of constraints that say it must be before * something else. The count for the item with dumpId j is - * stored in beforeConstraints[j]. + * stored in beforeConstraints[j]. We also make a map showing the + * input-order index of the item with dumpId j. */ beforeConstraints = (int *) malloc((maxDumpId + 1) * sizeof(int)); if (beforeConstraints == NULL) exit_horribly(NULL, modulename, "out of memory\n"); memset(beforeConstraints, 0, (maxDumpId + 1) * sizeof(int)); + idMap = (int *) malloc((maxDumpId + 1) * sizeof(int)); + if (idMap == NULL) + exit_horribly(NULL, modulename, "out of memory\n"); for (i = 0; i < numObjs; i++) { - obj = topoItems[i]; + obj = objs[i]; + j = obj->dumpId; + if (j <= 0 || j > maxDumpId) + exit_horribly(NULL, modulename, "invalid dumpId %d\n", j); + idMap[j] = i; for (j = 0; j < obj->nDeps; j++) { k = obj->dependencies[j]; @@ -185,63 +210,153 @@ TopoSort(DumpableObject **objs, } } + /* + * Now initialize the heap of items-ready-to-output by filling it with + * the indexes of items that already have beforeConstraints[id] == 0. + * + * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each + * j in the range 1..heapLength-1 (note we are using 0-based subscripts + * here, while the discussion in Knuth assumes 1-based subscripts). + * So, if we simply enter the indexes into pendingHeap[] in decreasing + * order, we a-fortiori have the heap invariant satisfied at completion + * of this loop, and don't need to do any sift-up comparisons. + */ + heapLength = 0; + for (i = numObjs; --i >= 0; ) + { + if (beforeConstraints[objs[i]->dumpId] == 0) + pendingHeap[heapLength++] = i; + } + /*-------------------- - * Now scan the topoItems array backwards. At each step, output the - * last item that has no remaining before-constraints, and decrease - * the beforeConstraints count of each of the items it was constrained - * against. - * i = index of ordering[] entry we want to output this time - * j = search index for topoItems[] + * Now emit objects, working backwards in the output list. At each step, + * we use the priority heap to select the last item that has no remaining + * before-constraints. We remove that item from the heap, output it to + * ordering[], and decrease the beforeConstraints count of each of the + * items it was constrained against. Whenever an item's beforeConstraints + * count is thereby decreased to zero, we insert it into the priority heap + * to show that it is a candidate to output. We are done when the heap + * becomes empty; if we have output every element then we succeeded, + * otherwise we failed. + * i = number of ordering[] entries left to output + * j = objs[] index of item we are outputting * k = temp for scanning constraint list for item j - * last = last non-null index in topoItems (avoid redundant searches) *-------------------- */ - last = numObjs - 1; - for (i = numObjs; --i >= 0;) + i = numObjs; + while (heapLength > 0) { - /* Find next candidate to output */ - while (topoItems[last] == NULL) - last--; - for (j = last; j >= 0; j--) - { - obj = topoItems[j]; - if (obj != NULL && beforeConstraints[obj->dumpId] == 0) - break; - } - /* If no available candidate, topological sort fails */ - if (j < 0) - { - result = false; - break; - } - /* Output candidate, and mark it done by zeroing topoItems[] entry */ - ordering[i] = obj = topoItems[j]; - topoItems[j] = NULL; + /* Select object to output by removing largest heap member */ + j = removeHeapElement(pendingHeap, heapLength--); + obj = objs[j]; + /* Output candidate to ordering[] */ + ordering[--i] = obj; /* Update beforeConstraints counts of its predecessors */ for (k = 0; k < obj->nDeps; k++) - beforeConstraints[obj->dependencies[k]]--; + { + int id = obj->dependencies[k]; + + if ((--beforeConstraints[id]) == 0) + addHeapElement(idMap[id], pendingHeap, heapLength++); + } } /* - * If we failed, report one of the circular constraint sets + * If we failed, report one of the circular constraint sets. We do + * this by scanning beforeConstraints[] to locate the items that have + * not yet been output, and for each one, trying to trace a constraint + * loop leading back to it. (There may be items that depend on items + * involved in a loop, but aren't themselves part of the loop, so not + * every nonzero beforeConstraints entry is necessarily a useful + * starting point. We keep trying till we find a loop.) */ - if (!result) + if (i != 0) { - for (j = last; j >= 0; j--) + for (j = 1; j <= maxDumpId; j++) { - ordering[0] = obj = topoItems[j]; - if (obj && findLoop(obj, 1, ordering, nOrdering)) - break; + if (beforeConstraints[j] != 0) + { + ordering[0] = obj = objs[idMap[j]]; + if (findLoop(obj, 1, ordering, nOrdering)) + break; + } } - if (j < 0) + if (j > maxDumpId) exit_horribly(NULL, modulename, "could not find dependency loop\n"); } /* Done */ - free(topoItems); + free(pendingHeap); free(beforeConstraints); + free(idMap); + + return (i == 0); +} + +/* + * Add an item to a heap (priority queue) + * + * heapLength is the current heap size; caller is responsible for increasing + * its value after the call. There must be sufficient storage at *heap. + */ +static void +addHeapElement(int val, int *heap, int heapLength) +{ + int j; + /* + * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth + * is using 1-based array indexes, not 0-based. + */ + j = heapLength; + while (j > 0) + { + int i = (j - 1) >> 1; + + if (val <= heap[i]) + break; + heap[j] = heap[i]; + j = i; + } + heap[j] = val; +} + +/* + * Remove the largest item present in a heap (priority queue) + * + * heapLength is the current heap size; caller is responsible for decreasing + * its value after the call. + * + * We remove and return heap[0], which is always the largest element of + * the heap, and then "sift up" to maintain the heap invariant. + */ +static int +removeHeapElement(int *heap, int heapLength) +{ + int result = heap[0]; + int val; + int i; + + if (--heapLength <= 0) + return result; + val = heap[heapLength]; /* value that must be reinserted */ + i = 0; /* i is where the "hole" is */ + for (;;) + { + int j = 2 * i + 1; + + if (j >= heapLength) + break; + if (j + 1 < heapLength && + heap[j] < heap[j + 1]) + j++; + if (val >= heap[j]) + break; + heap[i] = heap[j]; + i = j; + } + heap[i] = val; return result; }