Replace not-very-bright implementation of topological sort with a better

one (use a priority heap to keep track of items ready to output, instead
of searching the input array each time).  This brings the runtime of
pg_dump back to about what it was in 7.4.
This commit is contained in:
Tom Lane 2003-12-06 22:55:11 +00:00
parent 005a1217fb
commit 79273cc7d2
1 changed files with 169 additions and 54 deletions

View File

@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.1 2003/12/06 03:00:16 tgl Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.2 2003/12/06 22:55:11 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -52,6 +52,8 @@ static bool TopoSort(DumpableObject **objs,
int numObjs,
DumpableObject **ordering,
int *nOrdering);
static void addHeapElement(int val, int *heap, int heapLength);
static int removeHeapElement(int *heap, int heapLength);
static bool findLoop(DumpableObject *obj,
int depth,
DumpableObject **ordering,
@ -122,14 +124,13 @@ sortDumpableObjects(DumpableObject **objs, int numObjs)
* partial ordering.) Minimize rearrangement of the list not needed to
* achieve the partial ordering.
*
* This is a lot simpler and slower than, for example, the topological sort
* algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't
* try to minimize the damage to the existing order.
* The input is the list of numObjs objects in objs[]. This list is not
* modified.
*
* Returns TRUE if able to build an ordering that satisfies all the
* constraints, FALSE if not (there are contradictory constraints).
*
* On success (TRUE result), ordering[] is filled with an array of
* On success (TRUE result), ordering[] is filled with a sorted array of
* DumpableObject pointers, of length equal to the input list length.
*
* On failure (FALSE result), ordering[] is filled with an array of
@ -146,36 +147,60 @@ TopoSort(DumpableObject **objs,
int *nOrdering) /* output argument */
{
DumpId maxDumpId = getMaxDumpId();
bool result = true;
DumpableObject **topoItems;
DumpableObject *obj;
int *pendingHeap;
int *beforeConstraints;
int *idMap;
DumpableObject *obj;
int heapLength;
int i,
j,
k,
last;
k;
/* First, create work array with the dump items in their current order */
topoItems = (DumpableObject **) malloc(numObjs * sizeof(DumpableObject *));
if (topoItems == NULL)
exit_horribly(NULL, modulename, "out of memory\n");
memcpy(topoItems, objs, numObjs * sizeof(DumpableObject *));
/*
* This is basically the same algorithm shown for topological sorting in
* Knuth's Volume 1. However, we would like to minimize unnecessary
* rearrangement of the input ordering; that is, when we have a choice
* of which item to output next, we always want to take the one highest
* in the original list. Therefore, instead of maintaining an unordered
* linked list of items-ready-to-output as Knuth does, we maintain a heap
* of their item numbers, which we can use as a priority queue. This
* turns the algorithm from O(N) to O(N log N) because each insertion or
* removal of a heap item takes O(log N) time. However, that's still
* plenty fast enough for this application.
*/
*nOrdering = numObjs; /* for success return */
/* Eliminate the null case */
if (numObjs <= 0)
return true;
/* Create workspace for the above-described heap */
pendingHeap = (int *) malloc(numObjs * sizeof(int));
if (pendingHeap == NULL)
exit_horribly(NULL, modulename, "out of memory\n");
/*
* Scan the constraints, and for each item in the array, generate a
* Scan the constraints, and for each item in the input, generate a
* count of the number of constraints that say it must be before
* something else. The count for the item with dumpId j is
* stored in beforeConstraints[j].
* stored in beforeConstraints[j]. We also make a map showing the
* input-order index of the item with dumpId j.
*/
beforeConstraints = (int *) malloc((maxDumpId + 1) * sizeof(int));
if (beforeConstraints == NULL)
exit_horribly(NULL, modulename, "out of memory\n");
memset(beforeConstraints, 0, (maxDumpId + 1) * sizeof(int));
idMap = (int *) malloc((maxDumpId + 1) * sizeof(int));
if (idMap == NULL)
exit_horribly(NULL, modulename, "out of memory\n");
for (i = 0; i < numObjs; i++)
{
obj = topoItems[i];
obj = objs[i];
j = obj->dumpId;
if (j <= 0 || j > maxDumpId)
exit_horribly(NULL, modulename, "invalid dumpId %d\n", j);
idMap[j] = i;
for (j = 0; j < obj->nDeps; j++)
{
k = obj->dependencies[j];
@ -185,63 +210,153 @@ TopoSort(DumpableObject **objs,
}
}
/*
* Now initialize the heap of items-ready-to-output by filling it with
* the indexes of items that already have beforeConstraints[id] == 0.
*
* The essential property of a heap is heap[(j-1)/2] >= heap[j] for each
* j in the range 1..heapLength-1 (note we are using 0-based subscripts
* here, while the discussion in Knuth assumes 1-based subscripts).
* So, if we simply enter the indexes into pendingHeap[] in decreasing
* order, we a-fortiori have the heap invariant satisfied at completion
* of this loop, and don't need to do any sift-up comparisons.
*/
heapLength = 0;
for (i = numObjs; --i >= 0; )
{
if (beforeConstraints[objs[i]->dumpId] == 0)
pendingHeap[heapLength++] = i;
}
/*--------------------
* Now scan the topoItems array backwards. At each step, output the
* last item that has no remaining before-constraints, and decrease
* the beforeConstraints count of each of the items it was constrained
* against.
* i = index of ordering[] entry we want to output this time
* j = search index for topoItems[]
* Now emit objects, working backwards in the output list. At each step,
* we use the priority heap to select the last item that has no remaining
* before-constraints. We remove that item from the heap, output it to
* ordering[], and decrease the beforeConstraints count of each of the
* items it was constrained against. Whenever an item's beforeConstraints
* count is thereby decreased to zero, we insert it into the priority heap
* to show that it is a candidate to output. We are done when the heap
* becomes empty; if we have output every element then we succeeded,
* otherwise we failed.
* i = number of ordering[] entries left to output
* j = objs[] index of item we are outputting
* k = temp for scanning constraint list for item j
* last = last non-null index in topoItems (avoid redundant searches)
*--------------------
*/
last = numObjs - 1;
for (i = numObjs; --i >= 0;)
i = numObjs;
while (heapLength > 0)
{
/* Find next candidate to output */
while (topoItems[last] == NULL)
last--;
for (j = last; j >= 0; j--)
{
obj = topoItems[j];
if (obj != NULL && beforeConstraints[obj->dumpId] == 0)
break;
}
/* If no available candidate, topological sort fails */
if (j < 0)
{
result = false;
break;
}
/* Output candidate, and mark it done by zeroing topoItems[] entry */
ordering[i] = obj = topoItems[j];
topoItems[j] = NULL;
/* Select object to output by removing largest heap member */
j = removeHeapElement(pendingHeap, heapLength--);
obj = objs[j];
/* Output candidate to ordering[] */
ordering[--i] = obj;
/* Update beforeConstraints counts of its predecessors */
for (k = 0; k < obj->nDeps; k++)
beforeConstraints[obj->dependencies[k]]--;
{
int id = obj->dependencies[k];
if ((--beforeConstraints[id]) == 0)
addHeapElement(idMap[id], pendingHeap, heapLength++);
}
}
/*
* If we failed, report one of the circular constraint sets
* If we failed, report one of the circular constraint sets. We do
* this by scanning beforeConstraints[] to locate the items that have
* not yet been output, and for each one, trying to trace a constraint
* loop leading back to it. (There may be items that depend on items
* involved in a loop, but aren't themselves part of the loop, so not
* every nonzero beforeConstraints entry is necessarily a useful
* starting point. We keep trying till we find a loop.)
*/
if (!result)
if (i != 0)
{
for (j = last; j >= 0; j--)
for (j = 1; j <= maxDumpId; j++)
{
ordering[0] = obj = topoItems[j];
if (obj && findLoop(obj, 1, ordering, nOrdering))
break;
if (beforeConstraints[j] != 0)
{
ordering[0] = obj = objs[idMap[j]];
if (findLoop(obj, 1, ordering, nOrdering))
break;
}
}
if (j < 0)
if (j > maxDumpId)
exit_horribly(NULL, modulename,
"could not find dependency loop\n");
}
/* Done */
free(topoItems);
free(pendingHeap);
free(beforeConstraints);
free(idMap);
return (i == 0);
}
/*
* Add an item to a heap (priority queue)
*
* heapLength is the current heap size; caller is responsible for increasing
* its value after the call. There must be sufficient storage at *heap.
*/
static void
addHeapElement(int val, int *heap, int heapLength)
{
int j;
/*
* Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
* is using 1-based array indexes, not 0-based.
*/
j = heapLength;
while (j > 0)
{
int i = (j - 1) >> 1;
if (val <= heap[i])
break;
heap[j] = heap[i];
j = i;
}
heap[j] = val;
}
/*
* Remove the largest item present in a heap (priority queue)
*
* heapLength is the current heap size; caller is responsible for decreasing
* its value after the call.
*
* We remove and return heap[0], which is always the largest element of
* the heap, and then "sift up" to maintain the heap invariant.
*/
static int
removeHeapElement(int *heap, int heapLength)
{
int result = heap[0];
int val;
int i;
if (--heapLength <= 0)
return result;
val = heap[heapLength]; /* value that must be reinserted */
i = 0; /* i is where the "hole" is */
for (;;)
{
int j = 2 * i + 1;
if (j >= heapLength)
break;
if (j + 1 < heapLength &&
heap[j] < heap[j + 1])
j++;
if (val >= heap[j])
break;
heap[i] = heap[j];
i = j;
}
heap[i] = val;
return result;
}