]> granicus.if.org Git - postgresql/blob - src/bin/pg_dump/pg_dump_sort.c
Run pgindent on 9.2 source tree in preparation for first 9.3
[postgresql] / src / bin / pg_dump / pg_dump_sort.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump_sort.c
4  *        Sort the items of a dump into a safe order for dumping
5  *
6  *
7  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        src/bin/pg_dump/pg_dump_sort.c
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "pg_backup_archiver.h"
17 #include "dumputils.h"
18 #include "dumpmem.h"
19
20 static const char *modulename = gettext_noop("sorter");
21
22 /*
23  * Sort priority for object types when dumping a pre-7.3 database.
24  * Objects are sorted by priority levels, and within an equal priority level
25  * by OID.      (This is a relatively crude hack to provide semi-reasonable
26  * behavior for old databases without full dependency info.)  Note: collations,
27  * extensions, text search, foreign-data, and default ACL objects can't really
28  * happen here, so the rather bogus priorities for them don't matter.
29  */
30 static const int oldObjectTypePriority[] =
31 {
32         1,                                                      /* DO_NAMESPACE */
33         1,                                                      /* DO_EXTENSION */
34         2,                                                      /* DO_TYPE */
35         2,                                                      /* DO_SHELL_TYPE */
36         2,                                                      /* DO_FUNC */
37         3,                                                      /* DO_AGG */
38         3,                                                      /* DO_OPERATOR */
39         4,                                                      /* DO_OPCLASS */
40         4,                                                      /* DO_OPFAMILY */
41         5,                                                      /* DO_CONVERSION */
42         6,                                                      /* DO_TABLE */
43         8,                                                      /* DO_ATTRDEF */
44         13,                                                     /* DO_INDEX */
45         14,                                                     /* DO_RULE */
46         15,                                                     /* DO_TRIGGER */
47         12,                                                     /* DO_CONSTRAINT */
48         16,                                                     /* DO_FK_CONSTRAINT */
49         2,                                                      /* DO_PROCLANG */
50         2,                                                      /* DO_CAST */
51         10,                                                     /* DO_TABLE_DATA */
52         7,                                                      /* DO_DUMMY_TYPE */
53         3,                                                      /* DO_TSPARSER */
54         4,                                                      /* DO_TSDICT */
55         3,                                                      /* DO_TSTEMPLATE */
56         5,                                                      /* DO_TSCONFIG */
57         3,                                                      /* DO_FDW */
58         4,                                                      /* DO_FOREIGN_SERVER */
59         17,                                                     /* DO_DEFAULT_ACL */
60         9,                                                      /* DO_BLOB */
61         11,                                                     /* DO_BLOB_DATA */
62         2                                                       /* DO_COLLATION */
63 };
64
65 /*
66  * Sort priority for object types when dumping newer databases.
67  * Objects are sorted by type, and within a type by name.
68  */
69 static const int newObjectTypePriority[] =
70 {
71         1,                                                      /* DO_NAMESPACE */
72         4,                                                      /* DO_EXTENSION */
73         5,                                                      /* DO_TYPE */
74         5,                                                      /* DO_SHELL_TYPE */
75         6,                                                      /* DO_FUNC */
76         7,                                                      /* DO_AGG */
77         8,                                                      /* DO_OPERATOR */
78         9,                                                      /* DO_OPCLASS */
79         9,                                                      /* DO_OPFAMILY */
80         11,                                                     /* DO_CONVERSION */
81         18,                                                     /* DO_TABLE */
82         20,                                                     /* DO_ATTRDEF */
83         25,                                                     /* DO_INDEX */
84         26,                                                     /* DO_RULE */
85         27,                                                     /* DO_TRIGGER */
86         24,                                                     /* DO_CONSTRAINT */
87         28,                                                     /* DO_FK_CONSTRAINT */
88         2,                                                      /* DO_PROCLANG */
89         10,                                                     /* DO_CAST */
90         22,                                                     /* DO_TABLE_DATA */
91         19,                                                     /* DO_DUMMY_TYPE */
92         12,                                                     /* DO_TSPARSER */
93         14,                                                     /* DO_TSDICT */
94         13,                                                     /* DO_TSTEMPLATE */
95         15,                                                     /* DO_TSCONFIG */
96         16,                                                     /* DO_FDW */
97         17,                                                     /* DO_FOREIGN_SERVER */
98         29,                                                     /* DO_DEFAULT_ACL */
99         21,                                                     /* DO_BLOB */
100         23,                                                     /* DO_BLOB_DATA */
101         3                                                       /* DO_COLLATION */
102 };
103
104
105 static int      DOTypeNameCompare(const void *p1, const void *p2);
106 static int      DOTypeOidCompare(const void *p1, const void *p2);
107 static bool TopoSort(DumpableObject **objs,
108                  int numObjs,
109                  DumpableObject **ordering,
110                  int *nOrdering);
111 static void addHeapElement(int val, int *heap, int heapLength);
112 static int      removeHeapElement(int *heap, int heapLength);
113 static void findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs);
114 static int findLoop(DumpableObject *obj,
115                  DumpId startPoint,
116                  bool *processed,
117                  DumpableObject **workspace,
118                  int depth);
119 static void repairDependencyLoop(DumpableObject **loop,
120                                          int nLoop);
121 static void describeDumpableObject(DumpableObject *obj,
122                                            char *buf, int bufsize);
123
124
125 /*
126  * Sort the given objects into a type/name-based ordering
127  *
128  * Normally this is just the starting point for the dependency-based
129  * ordering.
130  */
131 void
132 sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs)
133 {
134         if (numObjs > 1)
135                 qsort((void *) objs, numObjs, sizeof(DumpableObject *),
136                           DOTypeNameCompare);
137 }
138
139 static int
140 DOTypeNameCompare(const void *p1, const void *p2)
141 {
142         DumpableObject *obj1 = *(DumpableObject *const *) p1;
143         DumpableObject *obj2 = *(DumpableObject *const *) p2;
144         int                     cmpval;
145
146         /* Sort by type */
147         cmpval = newObjectTypePriority[obj1->objType] -
148                 newObjectTypePriority[obj2->objType];
149
150         if (cmpval != 0)
151                 return cmpval;
152
153         /*
154          * Sort by namespace.  Note that all objects of the same type should
155          * either have or not have a namespace link, so we needn't be fancy about
156          * cases where one link is null and the other not.
157          */
158         if (obj1->namespace && obj2->namespace)
159         {
160                 cmpval = strcmp(obj1->namespace->dobj.name,
161                                                 obj2->namespace->dobj.name);
162                 if (cmpval != 0)
163                         return cmpval;
164         }
165
166         /* Sort by name */
167         cmpval = strcmp(obj1->name, obj2->name);
168         if (cmpval != 0)
169                 return cmpval;
170
171         /* To have a stable sort order, break ties for some object types */
172         if (obj1->objType == DO_FUNC || obj1->objType == DO_AGG)
173         {
174                 FuncInfo   *fobj1 = *(FuncInfo *const *) p1;
175                 FuncInfo   *fobj2 = *(FuncInfo *const *) p2;
176
177                 cmpval = fobj1->nargs - fobj2->nargs;
178                 if (cmpval != 0)
179                         return cmpval;
180         }
181         else if (obj1->objType == DO_OPERATOR)
182         {
183                 OprInfo    *oobj1 = *(OprInfo *const *) p1;
184                 OprInfo    *oobj2 = *(OprInfo *const *) p2;
185
186                 /* oprkind is 'l', 'r', or 'b'; this sorts prefix, postfix, infix */
187                 cmpval = (oobj2->oprkind - oobj1->oprkind);
188                 if (cmpval != 0)
189                         return cmpval;
190         }
191         else if (obj1->objType == DO_ATTRDEF)
192         {
193                 AttrDefInfo *adobj1 = *(AttrDefInfo *const *) p1;
194                 AttrDefInfo *adobj2 = *(AttrDefInfo *const *) p2;
195
196                 cmpval = (adobj1->adnum - adobj2->adnum);
197                 if (cmpval != 0)
198                         return cmpval;
199         }
200
201         /* Usually shouldn't get here, but if we do, sort by OID */
202         return oidcmp(obj1->catId.oid, obj2->catId.oid);
203 }
204
205
206 /*
207  * Sort the given objects into a type/OID-based ordering
208  *
209  * This is used with pre-7.3 source databases as a crude substitute for the
210  * lack of dependency information.
211  */
212 void
213 sortDumpableObjectsByTypeOid(DumpableObject **objs, int numObjs)
214 {
215         if (numObjs > 1)
216                 qsort((void *) objs, numObjs, sizeof(DumpableObject *),
217                           DOTypeOidCompare);
218 }
219
220 static int
221 DOTypeOidCompare(const void *p1, const void *p2)
222 {
223         DumpableObject *obj1 = *(DumpableObject *const *) p1;
224         DumpableObject *obj2 = *(DumpableObject *const *) p2;
225         int                     cmpval;
226
227         cmpval = oldObjectTypePriority[obj1->objType] -
228                 oldObjectTypePriority[obj2->objType];
229
230         if (cmpval != 0)
231                 return cmpval;
232
233         return oidcmp(obj1->catId.oid, obj2->catId.oid);
234 }
235
236
237 /*
238  * Sort the given objects into a safe dump order using dependency
239  * information (to the extent we have it available).
240  */
241 void
242 sortDumpableObjects(DumpableObject **objs, int numObjs)
243 {
244         DumpableObject **ordering;
245         int                     nOrdering;
246
247         if (numObjs <= 0)
248                 return;
249
250         ordering = (DumpableObject **) pg_malloc(numObjs * sizeof(DumpableObject *));
251         while (!TopoSort(objs, numObjs, ordering, &nOrdering))
252                 findDependencyLoops(ordering, nOrdering, numObjs);
253
254         memcpy(objs, ordering, numObjs * sizeof(DumpableObject *));
255
256         free(ordering);
257 }
258
259 /*
260  * TopoSort -- topological sort of a dump list
261  *
262  * Generate a re-ordering of the dump list that satisfies all the dependency
263  * constraints shown in the dump list.  (Each such constraint is a fact of a
264  * partial ordering.)  Minimize rearrangement of the list not needed to
265  * achieve the partial ordering.
266  *
267  * The input is the list of numObjs objects in objs[].  This list is not
268  * modified.
269  *
270  * Returns TRUE if able to build an ordering that satisfies all the
271  * constraints, FALSE if not (there are contradictory constraints).
272  *
273  * On success (TRUE result), ordering[] is filled with a sorted array of
274  * DumpableObject pointers, of length equal to the input list length.
275  *
276  * On failure (FALSE result), ordering[] is filled with an unsorted array of
277  * DumpableObject pointers of length *nOrdering, listing the objects that
278  * prevented the sort from being completed.  In general, these objects either
279  * participate directly in a dependency cycle, or are depended on by objects
280  * that are in a cycle.  (The latter objects are not actually problematic,
281  * but it takes further analysis to identify which are which.)
282  *
283  * The caller is responsible for allocating sufficient space at *ordering.
284  */
285 static bool
286 TopoSort(DumpableObject **objs,
287                  int numObjs,
288                  DumpableObject **ordering,             /* output argument */
289                  int *nOrdering)                /* output argument */
290 {
291         DumpId          maxDumpId = getMaxDumpId();
292         int                *pendingHeap;
293         int                *beforeConstraints;
294         int                *idMap;
295         DumpableObject *obj;
296         int                     heapLength;
297         int                     i,
298                                 j,
299                                 k;
300
301         /*
302          * This is basically the same algorithm shown for topological sorting in
303          * Knuth's Volume 1.  However, we would like to minimize unnecessary
304          * rearrangement of the input ordering; that is, when we have a choice of
305          * which item to output next, we always want to take the one highest in
306          * the original list.  Therefore, instead of maintaining an unordered
307          * linked list of items-ready-to-output as Knuth does, we maintain a heap
308          * of their item numbers, which we can use as a priority queue.  This
309          * turns the algorithm from O(N) to O(N log N) because each insertion or
310          * removal of a heap item takes O(log N) time.  However, that's still
311          * plenty fast enough for this application.
312          */
313
314         *nOrdering = numObjs;           /* for success return */
315
316         /* Eliminate the null case */
317         if (numObjs <= 0)
318                 return true;
319
320         /* Create workspace for the above-described heap */
321         pendingHeap = (int *) pg_malloc(numObjs * sizeof(int));
322
323         /*
324          * Scan the constraints, and for each item in the input, generate a count
325          * of the number of constraints that say it must be before something else.
326          * The count for the item with dumpId j is stored in beforeConstraints[j].
327          * We also make a map showing the input-order index of the item with
328          * dumpId j.
329          */
330         beforeConstraints = (int *) pg_malloc((maxDumpId + 1) * sizeof(int));
331         memset(beforeConstraints, 0, (maxDumpId + 1) * sizeof(int));
332         idMap = (int *) pg_malloc((maxDumpId + 1) * sizeof(int));
333         for (i = 0; i < numObjs; i++)
334         {
335                 obj = objs[i];
336                 j = obj->dumpId;
337                 if (j <= 0 || j > maxDumpId)
338                         exit_horribly(modulename, "invalid dumpId %d\n", j);
339                 idMap[j] = i;
340                 for (j = 0; j < obj->nDeps; j++)
341                 {
342                         k = obj->dependencies[j];
343                         if (k <= 0 || k > maxDumpId)
344                                 exit_horribly(modulename, "invalid dependency %d\n", k);
345                         beforeConstraints[k]++;
346                 }
347         }
348
349         /*
350          * Now initialize the heap of items-ready-to-output by filling it with the
351          * indexes of items that already have beforeConstraints[id] == 0.
352          *
353          * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each j
354          * in the range 1..heapLength-1 (note we are using 0-based subscripts
355          * here, while the discussion in Knuth assumes 1-based subscripts). So, if
356          * we simply enter the indexes into pendingHeap[] in decreasing order, we
357          * a-fortiori have the heap invariant satisfied at completion of this
358          * loop, and don't need to do any sift-up comparisons.
359          */
360         heapLength = 0;
361         for (i = numObjs; --i >= 0;)
362         {
363                 if (beforeConstraints[objs[i]->dumpId] == 0)
364                         pendingHeap[heapLength++] = i;
365         }
366
367         /*--------------------
368          * Now emit objects, working backwards in the output list.      At each step,
369          * we use the priority heap to select the last item that has no remaining
370          * before-constraints.  We remove that item from the heap, output it to
371          * ordering[], and decrease the beforeConstraints count of each of the
372          * items it was constrained against.  Whenever an item's beforeConstraints
373          * count is thereby decreased to zero, we insert it into the priority heap
374          * to show that it is a candidate to output.  We are done when the heap
375          * becomes empty; if we have output every element then we succeeded,
376          * otherwise we failed.
377          * i = number of ordering[] entries left to output
378          * j = objs[] index of item we are outputting
379          * k = temp for scanning constraint list for item j
380          *--------------------
381          */
382         i = numObjs;
383         while (heapLength > 0)
384         {
385                 /* Select object to output by removing largest heap member */
386                 j = removeHeapElement(pendingHeap, heapLength--);
387                 obj = objs[j];
388                 /* Output candidate to ordering[] */
389                 ordering[--i] = obj;
390                 /* Update beforeConstraints counts of its predecessors */
391                 for (k = 0; k < obj->nDeps; k++)
392                 {
393                         int                     id = obj->dependencies[k];
394
395                         if ((--beforeConstraints[id]) == 0)
396                                 addHeapElement(idMap[id], pendingHeap, heapLength++);
397                 }
398         }
399
400         /*
401          * If we failed, report the objects that couldn't be output; these are the
402          * ones with beforeConstraints[] still nonzero.
403          */
404         if (i != 0)
405         {
406                 k = 0;
407                 for (j = 1; j <= maxDumpId; j++)
408                 {
409                         if (beforeConstraints[j] != 0)
410                                 ordering[k++] = objs[idMap[j]];
411                 }
412                 *nOrdering = k;
413         }
414
415         /* Done */
416         free(pendingHeap);
417         free(beforeConstraints);
418         free(idMap);
419
420         return (i == 0);
421 }
422
423 /*
424  * Add an item to a heap (priority queue)
425  *
426  * heapLength is the current heap size; caller is responsible for increasing
427  * its value after the call.  There must be sufficient storage at *heap.
428  */
429 static void
430 addHeapElement(int val, int *heap, int heapLength)
431 {
432         int                     j;
433
434         /*
435          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
436          * using 1-based array indexes, not 0-based.
437          */
438         j = heapLength;
439         while (j > 0)
440         {
441                 int                     i = (j - 1) >> 1;
442
443                 if (val <= heap[i])
444                         break;
445                 heap[j] = heap[i];
446                 j = i;
447         }
448         heap[j] = val;
449 }
450
451 /*
452  * Remove the largest item present in a heap (priority queue)
453  *
454  * heapLength is the current heap size; caller is responsible for decreasing
455  * its value after the call.
456  *
457  * We remove and return heap[0], which is always the largest element of
458  * the heap, and then "sift up" to maintain the heap invariant.
459  */
460 static int
461 removeHeapElement(int *heap, int heapLength)
462 {
463         int                     result = heap[0];
464         int                     val;
465         int                     i;
466
467         if (--heapLength <= 0)
468                 return result;
469         val = heap[heapLength];         /* value that must be reinserted */
470         i = 0;                                          /* i is where the "hole" is */
471         for (;;)
472         {
473                 int                     j = 2 * i + 1;
474
475                 if (j >= heapLength)
476                         break;
477                 if (j + 1 < heapLength &&
478                         heap[j] < heap[j + 1])
479                         j++;
480                 if (val >= heap[j])
481                         break;
482                 heap[i] = heap[j];
483                 i = j;
484         }
485         heap[i] = val;
486         return result;
487 }
488
489 /*
490  * findDependencyLoops - identify loops in TopoSort's failure output,
491  *              and pass each such loop to repairDependencyLoop() for action
492  *
493  * In general there may be many loops in the set of objects returned by
494  * TopoSort; for speed we should try to repair as many loops as we can
495  * before trying TopoSort again.  We can safely repair loops that are
496  * disjoint (have no members in common); if we find overlapping loops
497  * then we repair only the first one found, because the action taken to
498  * repair the first might have repaired the other as well.      (If not,
499  * we'll fix it on the next go-round.)
500  *
501  * objs[] lists the objects TopoSort couldn't sort
502  * nObjs is the number of such objects
503  * totObjs is the total number of objects in the universe
504  */
505 static void
506 findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs)
507 {
508         /*
509          * We use two data structures here.  One is a bool array processed[],
510          * which is indexed by dump ID and marks the objects already processed
511          * during this invocation of findDependencyLoops().  The other is a
512          * workspace[] array of DumpableObject pointers, in which we try to build
513          * lists of objects constituting loops.  We make workspace[] large enough
514          * to hold all the objects, which is huge overkill in most cases but could
515          * theoretically be necessary if there is a single dependency chain
516          * linking all the objects.
517          */
518         bool       *processed;
519         DumpableObject **workspace;
520         bool            fixedloop;
521         int                     i;
522
523         processed = (bool *) pg_calloc(getMaxDumpId() + 1, sizeof(bool));
524         workspace = (DumpableObject **) pg_malloc(totObjs * sizeof(DumpableObject *));
525         fixedloop = false;
526
527         for (i = 0; i < nObjs; i++)
528         {
529                 DumpableObject *obj = objs[i];
530                 int                     looplen;
531                 int                     j;
532
533                 looplen = findLoop(obj, obj->dumpId, processed, workspace, 0);
534
535                 if (looplen > 0)
536                 {
537                         /* Found a loop, repair it */
538                         repairDependencyLoop(workspace, looplen);
539                         fixedloop = true;
540                         /* Mark loop members as processed */
541                         for (j = 0; j < looplen; j++)
542                                 processed[workspace[j]->dumpId] = true;
543                 }
544                 else
545                 {
546                         /*
547                          * There's no loop starting at this object, but mark it processed
548                          * anyway.      This is not necessary for correctness, but saves later
549                          * invocations of findLoop() from uselessly chasing references to
550                          * such an object.
551                          */
552                         processed[obj->dumpId] = true;
553                 }
554         }
555
556         /* We'd better have fixed at least one loop */
557         if (!fixedloop)
558                 exit_horribly(modulename, "could not identify dependency loop\n");
559
560         free(workspace);
561         free(processed);
562 }
563
564 /*
565  * Recursively search for a circular dependency loop that doesn't include
566  * any already-processed objects.
567  *
568  *      obj: object we are examining now
569  *      startPoint: dumpId of starting object for the hoped-for circular loop
570  *      processed[]: flag array marking already-processed objects
571  *      workspace[]: work array in which we are building list of loop members
572  *      depth: number of valid entries in workspace[] at call
573  *
574  * On success, the length of the loop is returned, and workspace[] is filled
575  * with pointers to the members of the loop.  On failure, we return 0.
576  *
577  * Note: it is possible that the given starting object is a member of more
578  * than one cycle; if so, we will find an arbitrary one of the cycles.
579  */
580 static int
581 findLoop(DumpableObject *obj,
582                  DumpId startPoint,
583                  bool *processed,
584                  DumpableObject **workspace,
585                  int depth)
586 {
587         int                     i;
588
589         /*
590          * Reject if obj is already processed.  This test prevents us from finding
591          * loops that overlap previously-processed loops.
592          */
593         if (processed[obj->dumpId])
594                 return 0;
595
596         /*
597          * Reject if obj is already present in workspace.  This test prevents us
598          * from going into infinite recursion if we are given a startPoint object
599          * that links to a cycle it's not a member of, and it guarantees that we
600          * can't overflow the allocated size of workspace[].
601          */
602         for (i = 0; i < depth; i++)
603         {
604                 if (workspace[i] == obj)
605                         return 0;
606         }
607
608         /*
609          * Okay, tentatively add obj to workspace
610          */
611         workspace[depth++] = obj;
612
613         /*
614          * See if we've found a loop back to the desired startPoint; if so, done
615          */
616         for (i = 0; i < obj->nDeps; i++)
617         {
618                 if (obj->dependencies[i] == startPoint)
619                         return depth;
620         }
621
622         /*
623          * Recurse down each outgoing branch
624          */
625         for (i = 0; i < obj->nDeps; i++)
626         {
627                 DumpableObject *nextobj = findObjectByDumpId(obj->dependencies[i]);
628                 int                     newDepth;
629
630                 if (!nextobj)
631                         continue;                       /* ignore dependencies on undumped objects */
632                 newDepth = findLoop(nextobj,
633                                                         startPoint,
634                                                         processed,
635                                                         workspace,
636                                                         depth);
637                 if (newDepth > 0)
638                         return newDepth;
639         }
640
641         return 0;
642 }
643
644 /*
645  * A user-defined datatype will have a dependency loop with each of its
646  * I/O functions (since those have the datatype as input or output).
647  * Similarly, a range type will have a loop with its canonicalize function,
648  * if any.      Break the loop by making the function depend on the associated
649  * shell type, instead.
650  */
651 static void
652 repairTypeFuncLoop(DumpableObject *typeobj, DumpableObject *funcobj)
653 {
654         TypeInfo   *typeInfo = (TypeInfo *) typeobj;
655
656         /* remove function's dependency on type */
657         removeObjectDependency(funcobj, typeobj->dumpId);
658
659         /* add function's dependency on shell type, instead */
660         if (typeInfo->shellType)
661         {
662                 addObjectDependency(funcobj, typeInfo->shellType->dobj.dumpId);
663                 /* Mark shell type as to be dumped if any such function is */
664                 if (funcobj->dump)
665                         typeInfo->shellType->dobj.dump = true;
666         }
667 }
668
669 /*
670  * Because we force a view to depend on its ON SELECT rule, while there
671  * will be an implicit dependency in the other direction, we need to break
672  * the loop.  If there are no other objects in the loop then we can remove
673  * the implicit dependency and leave the ON SELECT rule non-separate.
674  */
675 static void
676 repairViewRuleLoop(DumpableObject *viewobj,
677                                    DumpableObject *ruleobj)
678 {
679         /* remove rule's dependency on view */
680         removeObjectDependency(ruleobj, viewobj->dumpId);
681 }
682
683 /*
684  * However, if there are other objects in the loop, we must break the loop
685  * by making the ON SELECT rule a separately-dumped object.
686  *
687  * Because findLoop() finds shorter cycles before longer ones, it's likely
688  * that we will have previously fired repairViewRuleLoop() and removed the
689  * rule's dependency on the view.  Put it back to ensure the rule won't be
690  * emitted before the view...
691  */
692 static void
693 repairViewRuleMultiLoop(DumpableObject *viewobj,
694                                                 DumpableObject *ruleobj)
695 {
696         /* remove view's dependency on rule */
697         removeObjectDependency(viewobj, ruleobj->dumpId);
698         /* pretend view is a plain table and dump it that way */
699         ((TableInfo *) viewobj)->relkind = 'r';         /* RELKIND_RELATION */
700         /* mark rule as needing its own dump */
701         ((RuleInfo *) ruleobj)->separate = true;
702         /* put back rule's dependency on view */
703         addObjectDependency(ruleobj, viewobj->dumpId);
704 }
705
706 /*
707  * Because we make tables depend on their CHECK constraints, while there
708  * will be an automatic dependency in the other direction, we need to break
709  * the loop.  If there are no other objects in the loop then we can remove
710  * the automatic dependency and leave the CHECK constraint non-separate.
711  */
712 static void
713 repairTableConstraintLoop(DumpableObject *tableobj,
714                                                   DumpableObject *constraintobj)
715 {
716         /* remove constraint's dependency on table */
717         removeObjectDependency(constraintobj, tableobj->dumpId);
718 }
719
720 /*
721  * However, if there are other objects in the loop, we must break the loop
722  * by making the CHECK constraint a separately-dumped object.
723  *
724  * Because findLoop() finds shorter cycles before longer ones, it's likely
725  * that we will have previously fired repairTableConstraintLoop() and
726  * removed the constraint's dependency on the table.  Put it back to ensure
727  * the constraint won't be emitted before the table...
728  */
729 static void
730 repairTableConstraintMultiLoop(DumpableObject *tableobj,
731                                                            DumpableObject *constraintobj)
732 {
733         /* remove table's dependency on constraint */
734         removeObjectDependency(tableobj, constraintobj->dumpId);
735         /* mark constraint as needing its own dump */
736         ((ConstraintInfo *) constraintobj)->separate = true;
737         /* put back constraint's dependency on table */
738         addObjectDependency(constraintobj, tableobj->dumpId);
739 }
740
741 /*
742  * Attribute defaults behave exactly the same as CHECK constraints...
743  */
744 static void
745 repairTableAttrDefLoop(DumpableObject *tableobj,
746                                            DumpableObject *attrdefobj)
747 {
748         /* remove attrdef's dependency on table */
749         removeObjectDependency(attrdefobj, tableobj->dumpId);
750 }
751
752 static void
753 repairTableAttrDefMultiLoop(DumpableObject *tableobj,
754                                                         DumpableObject *attrdefobj)
755 {
756         /* remove table's dependency on attrdef */
757         removeObjectDependency(tableobj, attrdefobj->dumpId);
758         /* mark attrdef as needing its own dump */
759         ((AttrDefInfo *) attrdefobj)->separate = true;
760         /* put back attrdef's dependency on table */
761         addObjectDependency(attrdefobj, tableobj->dumpId);
762 }
763
764 /*
765  * CHECK constraints on domains work just like those on tables ...
766  */
767 static void
768 repairDomainConstraintLoop(DumpableObject *domainobj,
769                                                    DumpableObject *constraintobj)
770 {
771         /* remove constraint's dependency on domain */
772         removeObjectDependency(constraintobj, domainobj->dumpId);
773 }
774
775 static void
776 repairDomainConstraintMultiLoop(DumpableObject *domainobj,
777                                                                 DumpableObject *constraintobj)
778 {
779         /* remove domain's dependency on constraint */
780         removeObjectDependency(domainobj, constraintobj->dumpId);
781         /* mark constraint as needing its own dump */
782         ((ConstraintInfo *) constraintobj)->separate = true;
783         /* put back constraint's dependency on domain */
784         addObjectDependency(constraintobj, domainobj->dumpId);
785 }
786
787 /*
788  * Fix a dependency loop, or die trying ...
789  *
790  * This routine is mainly concerned with reducing the multiple ways that
791  * a loop might appear to common cases, which it passes off to the
792  * "fixer" routines above.
793  */
794 static void
795 repairDependencyLoop(DumpableObject **loop,
796                                          int nLoop)
797 {
798         int                     i,
799                                 j;
800
801         /* Datatype and one of its I/O or canonicalize functions */
802         if (nLoop == 2 &&
803                 loop[0]->objType == DO_TYPE &&
804                 loop[1]->objType == DO_FUNC)
805         {
806                 repairTypeFuncLoop(loop[0], loop[1]);
807                 return;
808         }
809         if (nLoop == 2 &&
810                 loop[1]->objType == DO_TYPE &&
811                 loop[0]->objType == DO_FUNC)
812         {
813                 repairTypeFuncLoop(loop[1], loop[0]);
814                 return;
815         }
816
817         /* View and its ON SELECT rule */
818         if (nLoop == 2 &&
819                 loop[0]->objType == DO_TABLE &&
820                 loop[1]->objType == DO_RULE &&
821                 ((RuleInfo *) loop[1])->ev_type == '1' &&
822                 ((RuleInfo *) loop[1])->is_instead &&
823                 ((RuleInfo *) loop[1])->ruletable == (TableInfo *) loop[0])
824         {
825                 repairViewRuleLoop(loop[0], loop[1]);
826                 return;
827         }
828         if (nLoop == 2 &&
829                 loop[1]->objType == DO_TABLE &&
830                 loop[0]->objType == DO_RULE &&
831                 ((RuleInfo *) loop[0])->ev_type == '1' &&
832                 ((RuleInfo *) loop[0])->is_instead &&
833                 ((RuleInfo *) loop[0])->ruletable == (TableInfo *) loop[1])
834         {
835                 repairViewRuleLoop(loop[1], loop[0]);
836                 return;
837         }
838
839         /* Indirect loop involving view and ON SELECT rule */
840         if (nLoop > 2)
841         {
842                 for (i = 0; i < nLoop; i++)
843                 {
844                         if (loop[i]->objType == DO_TABLE)
845                         {
846                                 for (j = 0; j < nLoop; j++)
847                                 {
848                                         if (loop[j]->objType == DO_RULE &&
849                                                 ((RuleInfo *) loop[j])->ev_type == '1' &&
850                                                 ((RuleInfo *) loop[j])->is_instead &&
851                                                 ((RuleInfo *) loop[j])->ruletable == (TableInfo *) loop[i])
852                                         {
853                                                 repairViewRuleMultiLoop(loop[i], loop[j]);
854                                                 return;
855                                         }
856                                 }
857                         }
858                 }
859         }
860
861         /* Table and CHECK constraint */
862         if (nLoop == 2 &&
863                 loop[0]->objType == DO_TABLE &&
864                 loop[1]->objType == DO_CONSTRAINT &&
865                 ((ConstraintInfo *) loop[1])->contype == 'c' &&
866                 ((ConstraintInfo *) loop[1])->contable == (TableInfo *) loop[0])
867         {
868                 repairTableConstraintLoop(loop[0], loop[1]);
869                 return;
870         }
871         if (nLoop == 2 &&
872                 loop[1]->objType == DO_TABLE &&
873                 loop[0]->objType == DO_CONSTRAINT &&
874                 ((ConstraintInfo *) loop[0])->contype == 'c' &&
875                 ((ConstraintInfo *) loop[0])->contable == (TableInfo *) loop[1])
876         {
877                 repairTableConstraintLoop(loop[1], loop[0]);
878                 return;
879         }
880
881         /* Indirect loop involving table and CHECK constraint */
882         if (nLoop > 2)
883         {
884                 for (i = 0; i < nLoop; i++)
885                 {
886                         if (loop[i]->objType == DO_TABLE)
887                         {
888                                 for (j = 0; j < nLoop; j++)
889                                 {
890                                         if (loop[j]->objType == DO_CONSTRAINT &&
891                                                 ((ConstraintInfo *) loop[j])->contype == 'c' &&
892                                                 ((ConstraintInfo *) loop[j])->contable == (TableInfo *) loop[i])
893                                         {
894                                                 repairTableConstraintMultiLoop(loop[i], loop[j]);
895                                                 return;
896                                         }
897                                 }
898                         }
899                 }
900         }
901
902         /* Table and attribute default */
903         if (nLoop == 2 &&
904                 loop[0]->objType == DO_TABLE &&
905                 loop[1]->objType == DO_ATTRDEF &&
906                 ((AttrDefInfo *) loop[1])->adtable == (TableInfo *) loop[0])
907         {
908                 repairTableAttrDefLoop(loop[0], loop[1]);
909                 return;
910         }
911         if (nLoop == 2 &&
912                 loop[1]->objType == DO_TABLE &&
913                 loop[0]->objType == DO_ATTRDEF &&
914                 ((AttrDefInfo *) loop[0])->adtable == (TableInfo *) loop[1])
915         {
916                 repairTableAttrDefLoop(loop[1], loop[0]);
917                 return;
918         }
919
920         /* Indirect loop involving table and attribute default */
921         if (nLoop > 2)
922         {
923                 for (i = 0; i < nLoop; i++)
924                 {
925                         if (loop[i]->objType == DO_TABLE)
926                         {
927                                 for (j = 0; j < nLoop; j++)
928                                 {
929                                         if (loop[j]->objType == DO_ATTRDEF &&
930                                                 ((AttrDefInfo *) loop[j])->adtable == (TableInfo *) loop[i])
931                                         {
932                                                 repairTableAttrDefMultiLoop(loop[i], loop[j]);
933                                                 return;
934                                         }
935                                 }
936                         }
937                 }
938         }
939
940         /* Domain and CHECK constraint */
941         if (nLoop == 2 &&
942                 loop[0]->objType == DO_TYPE &&
943                 loop[1]->objType == DO_CONSTRAINT &&
944                 ((ConstraintInfo *) loop[1])->contype == 'c' &&
945                 ((ConstraintInfo *) loop[1])->condomain == (TypeInfo *) loop[0])
946         {
947                 repairDomainConstraintLoop(loop[0], loop[1]);
948                 return;
949         }
950         if (nLoop == 2 &&
951                 loop[1]->objType == DO_TYPE &&
952                 loop[0]->objType == DO_CONSTRAINT &&
953                 ((ConstraintInfo *) loop[0])->contype == 'c' &&
954                 ((ConstraintInfo *) loop[0])->condomain == (TypeInfo *) loop[1])
955         {
956                 repairDomainConstraintLoop(loop[1], loop[0]);
957                 return;
958         }
959
960         /* Indirect loop involving domain and CHECK constraint */
961         if (nLoop > 2)
962         {
963                 for (i = 0; i < nLoop; i++)
964                 {
965                         if (loop[i]->objType == DO_TYPE)
966                         {
967                                 for (j = 0; j < nLoop; j++)
968                                 {
969                                         if (loop[j]->objType == DO_CONSTRAINT &&
970                                                 ((ConstraintInfo *) loop[j])->contype == 'c' &&
971                                                 ((ConstraintInfo *) loop[j])->condomain == (TypeInfo *) loop[i])
972                                         {
973                                                 repairDomainConstraintMultiLoop(loop[i], loop[j]);
974                                                 return;
975                                         }
976                                 }
977                         }
978                 }
979         }
980
981         /*
982          * If all the objects are TABLE_DATA items, what we must have is a
983          * circular set of foreign key constraints (or a single self-referential
984          * table).      Print an appropriate complaint and break the loop arbitrarily.
985          */
986         for (i = 0; i < nLoop; i++)
987         {
988                 if (loop[i]->objType != DO_TABLE_DATA)
989                         break;
990         }
991         if (i >= nLoop)
992         {
993                 write_msg(NULL, "NOTICE: there are circular foreign-key constraints among these table(s):\n");
994                 for (i = 0; i < nLoop; i++)
995                         write_msg(NULL, "  %s\n", loop[i]->name);
996                 write_msg(NULL, "You might not be able to restore the dump without using --disable-triggers or temporarily dropping the constraints.\n");
997                 write_msg(NULL, "Consider using a full dump instead of a --data-only dump to avoid this problem.\n");
998                 if (nLoop > 1)
999                         removeObjectDependency(loop[0], loop[1]->dumpId);
1000                 else    /* must be a self-dependency */
1001                         removeObjectDependency(loop[0], loop[0]->dumpId);
1002                 return;
1003         }
1004
1005         /*
1006          * If we can't find a principled way to break the loop, complain and break
1007          * it in an arbitrary fashion.
1008          */
1009         write_msg(modulename, "WARNING: could not resolve dependency loop among these items:\n");
1010         for (i = 0; i < nLoop; i++)
1011         {
1012                 char            buf[1024];
1013
1014                 describeDumpableObject(loop[i], buf, sizeof(buf));
1015                 write_msg(modulename, "  %s\n", buf);
1016         }
1017
1018         if (nLoop > 1)
1019                 removeObjectDependency(loop[0], loop[1]->dumpId);
1020         else    /* must be a self-dependency */
1021                 removeObjectDependency(loop[0], loop[0]->dumpId);
1022 }
1023
1024 /*
1025  * Describe a dumpable object usefully for errors
1026  *
1027  * This should probably go somewhere else...
1028  */
1029 static void
1030 describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
1031 {
1032         switch (obj->objType)
1033         {
1034                 case DO_NAMESPACE:
1035                         snprintf(buf, bufsize,
1036                                          "SCHEMA %s  (ID %d OID %u)",
1037                                          obj->name, obj->dumpId, obj->catId.oid);
1038                         return;
1039                 case DO_EXTENSION:
1040                         snprintf(buf, bufsize,
1041                                          "EXTENSION %s  (ID %d OID %u)",
1042                                          obj->name, obj->dumpId, obj->catId.oid);
1043                         return;
1044                 case DO_TYPE:
1045                         snprintf(buf, bufsize,
1046                                          "TYPE %s  (ID %d OID %u)",
1047                                          obj->name, obj->dumpId, obj->catId.oid);
1048                         return;
1049                 case DO_SHELL_TYPE:
1050                         snprintf(buf, bufsize,
1051                                          "SHELL TYPE %s  (ID %d OID %u)",
1052                                          obj->name, obj->dumpId, obj->catId.oid);
1053                         return;
1054                 case DO_FUNC:
1055                         snprintf(buf, bufsize,
1056                                          "FUNCTION %s  (ID %d OID %u)",
1057                                          obj->name, obj->dumpId, obj->catId.oid);
1058                         return;
1059                 case DO_AGG:
1060                         snprintf(buf, bufsize,
1061                                          "AGGREGATE %s  (ID %d OID %u)",
1062                                          obj->name, obj->dumpId, obj->catId.oid);
1063                         return;
1064                 case DO_OPERATOR:
1065                         snprintf(buf, bufsize,
1066                                          "OPERATOR %s  (ID %d OID %u)",
1067                                          obj->name, obj->dumpId, obj->catId.oid);
1068                         return;
1069                 case DO_OPCLASS:
1070                         snprintf(buf, bufsize,
1071                                          "OPERATOR CLASS %s  (ID %d OID %u)",
1072                                          obj->name, obj->dumpId, obj->catId.oid);
1073                         return;
1074                 case DO_OPFAMILY:
1075                         snprintf(buf, bufsize,
1076                                          "OPERATOR FAMILY %s  (ID %d OID %u)",
1077                                          obj->name, obj->dumpId, obj->catId.oid);
1078                         return;
1079                 case DO_COLLATION:
1080                         snprintf(buf, bufsize,
1081                                          "COLLATION %s  (ID %d OID %u)",
1082                                          obj->name, obj->dumpId, obj->catId.oid);
1083                         return;
1084                 case DO_CONVERSION:
1085                         snprintf(buf, bufsize,
1086                                          "CONVERSION %s  (ID %d OID %u)",
1087                                          obj->name, obj->dumpId, obj->catId.oid);
1088                         return;
1089                 case DO_TABLE:
1090                         snprintf(buf, bufsize,
1091                                          "TABLE %s  (ID %d OID %u)",
1092                                          obj->name, obj->dumpId, obj->catId.oid);
1093                         return;
1094                 case DO_ATTRDEF:
1095                         snprintf(buf, bufsize,
1096                                          "ATTRDEF %s.%s  (ID %d OID %u)",
1097                                          ((AttrDefInfo *) obj)->adtable->dobj.name,
1098                                          ((AttrDefInfo *) obj)->adtable->attnames[((AttrDefInfo *) obj)->adnum - 1],
1099                                          obj->dumpId, obj->catId.oid);
1100                         return;
1101                 case DO_INDEX:
1102                         snprintf(buf, bufsize,
1103                                          "INDEX %s  (ID %d OID %u)",
1104                                          obj->name, obj->dumpId, obj->catId.oid);
1105                         return;
1106                 case DO_RULE:
1107                         snprintf(buf, bufsize,
1108                                          "RULE %s  (ID %d OID %u)",
1109                                          obj->name, obj->dumpId, obj->catId.oid);
1110                         return;
1111                 case DO_TRIGGER:
1112                         snprintf(buf, bufsize,
1113                                          "TRIGGER %s  (ID %d OID %u)",
1114                                          obj->name, obj->dumpId, obj->catId.oid);
1115                         return;
1116                 case DO_CONSTRAINT:
1117                         snprintf(buf, bufsize,
1118                                          "CONSTRAINT %s  (ID %d OID %u)",
1119                                          obj->name, obj->dumpId, obj->catId.oid);
1120                         return;
1121                 case DO_FK_CONSTRAINT:
1122                         snprintf(buf, bufsize,
1123                                          "FK CONSTRAINT %s  (ID %d OID %u)",
1124                                          obj->name, obj->dumpId, obj->catId.oid);
1125                         return;
1126                 case DO_PROCLANG:
1127                         snprintf(buf, bufsize,
1128                                          "PROCEDURAL LANGUAGE %s  (ID %d OID %u)",
1129                                          obj->name, obj->dumpId, obj->catId.oid);
1130                         return;
1131                 case DO_CAST:
1132                         snprintf(buf, bufsize,
1133                                          "CAST %u to %u  (ID %d OID %u)",
1134                                          ((CastInfo *) obj)->castsource,
1135                                          ((CastInfo *) obj)->casttarget,
1136                                          obj->dumpId, obj->catId.oid);
1137                         return;
1138                 case DO_TABLE_DATA:
1139                         snprintf(buf, bufsize,
1140                                          "TABLE DATA %s  (ID %d OID %u)",
1141                                          obj->name, obj->dumpId, obj->catId.oid);
1142                         return;
1143                 case DO_DUMMY_TYPE:
1144                         snprintf(buf, bufsize,
1145                                          "DUMMY TYPE %s  (ID %d OID %u)",
1146                                          obj->name, obj->dumpId, obj->catId.oid);
1147                         return;
1148                 case DO_TSPARSER:
1149                         snprintf(buf, bufsize,
1150                                          "TEXT SEARCH PARSER %s  (ID %d OID %u)",
1151                                          obj->name, obj->dumpId, obj->catId.oid);
1152                         return;
1153                 case DO_TSDICT:
1154                         snprintf(buf, bufsize,
1155                                          "TEXT SEARCH DICTIONARY %s  (ID %d OID %u)",
1156                                          obj->name, obj->dumpId, obj->catId.oid);
1157                         return;
1158                 case DO_TSTEMPLATE:
1159                         snprintf(buf, bufsize,
1160                                          "TEXT SEARCH TEMPLATE %s  (ID %d OID %u)",
1161                                          obj->name, obj->dumpId, obj->catId.oid);
1162                         return;
1163                 case DO_TSCONFIG:
1164                         snprintf(buf, bufsize,
1165                                          "TEXT SEARCH CONFIGURATION %s  (ID %d OID %u)",
1166                                          obj->name, obj->dumpId, obj->catId.oid);
1167                         return;
1168                 case DO_FDW:
1169                         snprintf(buf, bufsize,
1170                                          "FOREIGN DATA WRAPPER %s  (ID %d OID %u)",
1171                                          obj->name, obj->dumpId, obj->catId.oid);
1172                         return;
1173                 case DO_FOREIGN_SERVER:
1174                         snprintf(buf, bufsize,
1175                                          "FOREIGN SERVER %s  (ID %d OID %u)",
1176                                          obj->name, obj->dumpId, obj->catId.oid);
1177                         return;
1178                 case DO_DEFAULT_ACL:
1179                         snprintf(buf, bufsize,
1180                                          "DEFAULT ACL %s  (ID %d OID %u)",
1181                                          obj->name, obj->dumpId, obj->catId.oid);
1182                         return;
1183                 case DO_BLOB:
1184                         snprintf(buf, bufsize,
1185                                          "BLOB  (ID %d OID %u)",
1186                                          obj->dumpId, obj->catId.oid);
1187                         return;
1188                 case DO_BLOB_DATA:
1189                         snprintf(buf, bufsize,
1190                                          "BLOB DATA  (ID %d)",
1191                                          obj->dumpId);
1192                         return;
1193         }
1194         /* shouldn't get here */
1195         snprintf(buf, bufsize,
1196                          "object type %d  (ID %d OID %u)",
1197                          (int) obj->objType,
1198                          obj->dumpId, obj->catId.oid);
1199 }