]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
Update copyright for 2006. Update scripts.
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.50 2006/03/05 15:58:22 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/dependency.h"
20 #include "catalog/heap.h"
21 #include "catalog/index.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_attrdef.h"
25 #include "catalog/pg_authid.h"
26 #include "catalog/pg_cast.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_conversion.h"
29 #include "catalog/pg_database.h"
30 #include "catalog/pg_depend.h"
31 #include "catalog/pg_language.h"
32 #include "catalog/pg_namespace.h"
33 #include "catalog/pg_opclass.h"
34 #include "catalog/pg_operator.h"
35 #include "catalog/pg_proc.h"
36 #include "catalog/pg_rewrite.h"
37 #include "catalog/pg_tablespace.h"
38 #include "catalog/pg_trigger.h"
39 #include "catalog/pg_type.h"
40 #include "commands/comment.h"
41 #include "commands/dbcommands.h"
42 #include "commands/defrem.h"
43 #include "commands/proclang.h"
44 #include "commands/schemacmds.h"
45 #include "commands/tablespace.h"
46 #include "commands/trigger.h"
47 #include "commands/typecmds.h"
48 #include "lib/stringinfo.h"
49 #include "miscadmin.h"
50 #include "optimizer/clauses.h"
51 #include "parser/parsetree.h"
52 #include "rewrite/rewriteRemove.h"
53 #include "utils/builtins.h"
54 #include "utils/fmgroids.h"
55 #include "utils/lsyscache.h"
56 #include "utils/syscache.h"
57
58
59 /* expansible list of ObjectAddresses */
60 typedef struct
61 {
62         ObjectAddress *refs;            /* => palloc'd array */
63         int                     numrefs;                /* current number of references */
64         int                     maxrefs;                /* current size of palloc'd array */
65 } ObjectAddresses;
66
67 /* for find_expr_references_walker */
68 typedef struct
69 {
70         ObjectAddresses addrs;          /* addresses being accumulated */
71         List       *rtables;            /* list of rangetables to resolve Vars */
72 } find_expr_references_context;
73
74 /*
75  * This constant table maps ObjectClasses to the corresponding catalog OIDs.
76  * See also getObjectClass().
77  */
78 static const Oid object_classes[MAX_OCLASS] = {
79         RelationRelationId,                     /* OCLASS_CLASS */
80         ProcedureRelationId,            /* OCLASS_PROC */
81         TypeRelationId,                         /* OCLASS_TYPE */
82         CastRelationId,                         /* OCLASS_CAST */
83         ConstraintRelationId,           /* OCLASS_CONSTRAINT */
84         ConversionRelationId,           /* OCLASS_CONVERSION */
85         AttrDefaultRelationId,          /* OCLASS_DEFAULT */
86         LanguageRelationId,                     /* OCLASS_LANGUAGE */
87         OperatorRelationId,                     /* OCLASS_OPERATOR */
88         OperatorClassRelationId,        /* OCLASS_OPCLASS */
89         RewriteRelationId,                      /* OCLASS_REWRITE */
90         TriggerRelationId,                      /* OCLASS_TRIGGER */
91         NamespaceRelationId                     /* OCLASS_SCHEMA */
92 };
93
94
95 static void findAutoDeletableObjects(const ObjectAddress *object,
96                                                  ObjectAddresses *oktodelete,
97                                                  Relation depRel);
98 static bool recursiveDeletion(const ObjectAddress *object,
99                                   DropBehavior behavior,
100                                   int msglevel,
101                                   const ObjectAddress *callingObject,
102                                   ObjectAddresses *oktodelete,
103                                   Relation depRel);
104 static bool deleteDependentObjects(const ObjectAddress *object,
105                                            const char *objDescription,
106                                            DropBehavior behavior,
107                                            int msglevel,
108                                            ObjectAddresses *oktodelete,
109                                            Relation depRel);
110 static void doDeletion(const ObjectAddress *object);
111 static bool find_expr_references_walker(Node *node,
112                                                         find_expr_references_context *context);
113 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
114 static int      object_address_comparator(const void *a, const void *b);
115 static void init_object_addresses(ObjectAddresses *addrs);
116 static void add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
117                                    ObjectAddresses *addrs);
118 static void add_exact_object_address(const ObjectAddress *object,
119                                                  ObjectAddresses *addrs);
120 static bool object_address_present(const ObjectAddress *object,
121                                            ObjectAddresses *addrs);
122 static void term_object_addresses(ObjectAddresses *addrs);
123 static void getRelationDescription(StringInfo buffer, Oid relid);
124
125
126 /*
127  * performDeletion: attempt to drop the specified object.  If CASCADE
128  * behavior is specified, also drop any dependent objects (recursively).
129  * If RESTRICT behavior is specified, error out if there are any dependent
130  * objects, except for those that should be implicitly dropped anyway
131  * according to the dependency type.
132  *
133  * This is the outer control routine for all forms of DROP that drop objects
134  * that can participate in dependencies.
135  */
136 void
137 performDeletion(const ObjectAddress *object,
138                                 DropBehavior behavior)
139 {
140         char       *objDescription;
141         Relation        depRel;
142         ObjectAddresses oktodelete;
143
144         /*
145          * Get object description for possible use in failure message. Must do
146          * this before deleting it ...
147          */
148         objDescription = getObjectDescription(object);
149
150         /*
151          * We save some cycles by opening pg_depend just once and passing the
152          * Relation pointer down to all the recursive deletion steps.
153          */
154         depRel = heap_open(DependRelationId, RowExclusiveLock);
155
156         /*
157          * Construct a list of objects that are reachable by AUTO or INTERNAL
158          * dependencies from the target object.  These should be deleted silently,
159          * even if the actual deletion pass first reaches one of them via a
160          * non-auto dependency.
161          */
162         init_object_addresses(&oktodelete);
163
164         findAutoDeletableObjects(object, &oktodelete, depRel);
165
166         if (!recursiveDeletion(object, behavior, NOTICE,
167                                                    NULL, &oktodelete, depRel))
168                 ereport(ERROR,
169                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
170                                  errmsg("cannot drop %s because other objects depend on it",
171                                                 objDescription),
172                 errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
173
174         term_object_addresses(&oktodelete);
175
176         heap_close(depRel, RowExclusiveLock);
177
178         pfree(objDescription);
179 }
180
181
182 /*
183  * deleteWhatDependsOn: attempt to drop everything that depends on the
184  * specified object, though not the object itself.      Behavior is always
185  * CASCADE.
186  *
187  * This is currently used only to clean out the contents of a schema
188  * (namespace): the passed object is a namespace.  We normally want this
189  * to be done silently, so there's an option to suppress NOTICE messages.
190  */
191 void
192 deleteWhatDependsOn(const ObjectAddress *object,
193                                         bool showNotices)
194 {
195         char       *objDescription;
196         Relation        depRel;
197         ObjectAddresses oktodelete;
198
199         /*
200          * Get object description for possible use in failure messages
201          */
202         objDescription = getObjectDescription(object);
203
204         /*
205          * We save some cycles by opening pg_depend just once and passing the
206          * Relation pointer down to all the recursive deletion steps.
207          */
208         depRel = heap_open(DependRelationId, RowExclusiveLock);
209
210         /*
211          * Construct a list of objects that are reachable by AUTO or INTERNAL
212          * dependencies from the target object.  These should be deleted silently,
213          * even if the actual deletion pass first reaches one of them via a
214          * non-auto dependency.
215          */
216         init_object_addresses(&oktodelete);
217
218         findAutoDeletableObjects(object, &oktodelete, depRel);
219
220         /*
221          * Now invoke only step 2 of recursiveDeletion: just recurse to the stuff
222          * dependent on the given object.
223          */
224         if (!deleteDependentObjects(object, objDescription,
225                                                                 DROP_CASCADE,
226                                                                 showNotices ? NOTICE : DEBUG2,
227                                                                 &oktodelete, depRel))
228                 ereport(ERROR,
229                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
230                                  errmsg("failed to drop all objects depending on %s",
231                                                 objDescription)));
232
233         /*
234          * We do not need CommandCounterIncrement here, since if step 2 did
235          * anything then each recursive call will have ended with one.
236          */
237
238         term_object_addresses(&oktodelete);
239
240         heap_close(depRel, RowExclusiveLock);
241
242         pfree(objDescription);
243 }
244
245
246 /*
247  * findAutoDeletableObjects: find all objects that are reachable by AUTO or
248  * INTERNAL dependency paths from the given object.  Add them all to the
249  * oktodelete list.  Note that the originally given object will also be
250  * added to the list.
251  *
252  * depRel is the already-open pg_depend relation.
253  */
254 static void
255 findAutoDeletableObjects(const ObjectAddress *object,
256                                                  ObjectAddresses *oktodelete,
257                                                  Relation depRel)
258 {
259         ScanKeyData key[3];
260         int                     nkeys;
261         SysScanDesc scan;
262         HeapTuple       tup;
263         ObjectAddress otherObject;
264
265         /*
266          * If this object is already in oktodelete, then we already visited it;
267          * don't do so again (this prevents infinite recursion if there's a loop
268          * in pg_depend).  Otherwise, add it.
269          */
270         if (object_address_present(object, oktodelete))
271                 return;
272         add_exact_object_address(object, oktodelete);
273
274         /*
275          * Scan pg_depend records that link to this object, showing the things
276          * that depend on it.  For each one that is AUTO or INTERNAL, visit the
277          * referencing object.
278          *
279          * When dropping a whole object (subId = 0), find pg_depend records for
280          * its sub-objects too.
281          */
282         ScanKeyInit(&key[0],
283                                 Anum_pg_depend_refclassid,
284                                 BTEqualStrategyNumber, F_OIDEQ,
285                                 ObjectIdGetDatum(object->classId));
286         ScanKeyInit(&key[1],
287                                 Anum_pg_depend_refobjid,
288                                 BTEqualStrategyNumber, F_OIDEQ,
289                                 ObjectIdGetDatum(object->objectId));
290         if (object->objectSubId != 0)
291         {
292                 ScanKeyInit(&key[2],
293                                         Anum_pg_depend_refobjsubid,
294                                         BTEqualStrategyNumber, F_INT4EQ,
295                                         Int32GetDatum(object->objectSubId));
296                 nkeys = 3;
297         }
298         else
299                 nkeys = 2;
300
301         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
302                                                           SnapshotNow, nkeys, key);
303
304         while (HeapTupleIsValid(tup = systable_getnext(scan)))
305         {
306                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
307
308                 switch (foundDep->deptype)
309                 {
310                         case DEPENDENCY_NORMAL:
311                                 /* ignore */
312                                 break;
313                         case DEPENDENCY_AUTO:
314                         case DEPENDENCY_INTERNAL:
315                                 /* recurse */
316                                 otherObject.classId = foundDep->classid;
317                                 otherObject.objectId = foundDep->objid;
318                                 otherObject.objectSubId = foundDep->objsubid;
319                                 findAutoDeletableObjects(&otherObject, oktodelete, depRel);
320                                 break;
321                         case DEPENDENCY_PIN:
322
323                                 /*
324                                  * For a PIN dependency we just ereport immediately; there
325                                  * won't be any others to examine, and we aren't ever going to
326                                  * let the user delete it.
327                                  */
328                                 ereport(ERROR,
329                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
330                                                  errmsg("cannot drop %s because it is required by the database system",
331                                                                 getObjectDescription(object))));
332                                 break;
333                         default:
334                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
335                                          foundDep->deptype, getObjectDescription(object));
336                                 break;
337                 }
338         }
339
340         systable_endscan(scan);
341 }
342
343
344 /*
345  * recursiveDeletion: delete a single object for performDeletion, plus
346  * (recursively) anything that depends on it.
347  *
348  * Returns TRUE if successful, FALSE if not.
349  *
350  * callingObject is NULL at the outer level, else identifies the object that
351  * we recursed from (the reference object that someone else needs to delete).
352  *
353  * oktodelete is a list of objects verified deletable (ie, reachable by one
354  * or more AUTO or INTERNAL dependencies from the original target).
355  *
356  * depRel is the already-open pg_depend relation.
357  *
358  *
359  * In RESTRICT mode, we perform all the deletions anyway, but ereport a message
360  * and return FALSE if we find a restriction violation.  performDeletion
361  * will then abort the transaction to nullify the deletions.  We have to
362  * do it this way to (a) report all the direct and indirect dependencies
363  * while (b) not going into infinite recursion if there's a cycle.
364  *
365  * This is even more complex than one could wish, because it is possible for
366  * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
367  * dependencies.  Also, we might have a situation where we've been asked to
368  * delete object A, and objects B and C both have AUTO dependencies on A,
369  * but B also has a NORMAL dependency on C.  (Since any of these paths might
370  * be indirect, we can't prevent these scenarios, but must cope instead.)
371  * If we visit C before B then we would mistakenly decide that the B->C link
372  * should prevent the restricted drop from occurring.  To handle this, we make
373  * a pre-scan to find all the objects that are auto-deletable from A.  If we
374  * visit C first, but B is present in the oktodelete list, then we make no
375  * complaint but recurse to delete B anyway.  (Note that in general we must
376  * delete B before deleting C; the drop routine for B may try to access C.)
377  *
378  * Note: in the case where the path to B is traversed first, we will not
379  * see the NORMAL dependency when we reach C, because of the pg_depend
380  * removals done in step 1.  The oktodelete list is necessary just
381  * to make the behavior independent of the order in which pg_depend
382  * entries are visited.
383  */
384 static bool
385 recursiveDeletion(const ObjectAddress *object,
386                                   DropBehavior behavior,
387                                   int msglevel,
388                                   const ObjectAddress *callingObject,
389                                   ObjectAddresses *oktodelete,
390                                   Relation depRel)
391 {
392         bool            ok = true;
393         char       *objDescription;
394         ScanKeyData key[3];
395         int                     nkeys;
396         SysScanDesc scan;
397         HeapTuple       tup;
398         ObjectAddress otherObject;
399         ObjectAddress owningObject;
400         bool            amOwned = false;
401
402         /*
403          * Get object description for possible use in messages.  Must do this
404          * before deleting it ...
405          */
406         objDescription = getObjectDescription(object);
407
408         /*
409          * Step 1: find and remove pg_depend records that link from this object to
410          * others.      We have to do this anyway, and doing it first ensures that we
411          * avoid infinite recursion in the case of cycles. Also, some dependency
412          * types require extra processing here.
413          *
414          * When dropping a whole object (subId = 0), remove all pg_depend records
415          * for its sub-objects too.
416          */
417         ScanKeyInit(&key[0],
418                                 Anum_pg_depend_classid,
419                                 BTEqualStrategyNumber, F_OIDEQ,
420                                 ObjectIdGetDatum(object->classId));
421         ScanKeyInit(&key[1],
422                                 Anum_pg_depend_objid,
423                                 BTEqualStrategyNumber, F_OIDEQ,
424                                 ObjectIdGetDatum(object->objectId));
425         if (object->objectSubId != 0)
426         {
427                 ScanKeyInit(&key[2],
428                                         Anum_pg_depend_objsubid,
429                                         BTEqualStrategyNumber, F_INT4EQ,
430                                         Int32GetDatum(object->objectSubId));
431                 nkeys = 3;
432         }
433         else
434                 nkeys = 2;
435
436         scan = systable_beginscan(depRel, DependDependerIndexId, true,
437                                                           SnapshotNow, nkeys, key);
438
439         while (HeapTupleIsValid(tup = systable_getnext(scan)))
440         {
441                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
442
443                 otherObject.classId = foundDep->refclassid;
444                 otherObject.objectId = foundDep->refobjid;
445                 otherObject.objectSubId = foundDep->refobjsubid;
446
447                 switch (foundDep->deptype)
448                 {
449                         case DEPENDENCY_NORMAL:
450                         case DEPENDENCY_AUTO:
451                                 /* no problem */
452                                 break;
453                         case DEPENDENCY_INTERNAL:
454
455                                 /*
456                                  * This object is part of the internal implementation of
457                                  * another object.      We have three cases:
458                                  *
459                                  * 1. At the outermost recursion level, disallow the DROP. (We
460                                  * just ereport here, rather than proceeding, since no other
461                                  * dependencies are likely to be interesting.)
462                                  */
463                                 if (callingObject == NULL)
464                                 {
465                                         char       *otherObjDesc = getObjectDescription(&otherObject);
466
467                                         ereport(ERROR,
468                                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
469                                                          errmsg("cannot drop %s because %s requires it",
470                                                                         objDescription, otherObjDesc),
471                                                          errhint("You may drop %s instead.",
472                                                                          otherObjDesc)));
473                                 }
474
475                                 /*
476                                  * 2. When recursing from the other end of this dependency,
477                                  * it's okay to continue with the deletion. This holds when
478                                  * recursing from a whole object that includes the nominal
479                                  * other end as a component, too.
480                                  */
481                                 if (callingObject->classId == otherObject.classId &&
482                                         callingObject->objectId == otherObject.objectId &&
483                                         (callingObject->objectSubId == otherObject.objectSubId ||
484                                          callingObject->objectSubId == 0))
485                                         break;
486
487                                 /*
488                                  * 3. When recursing from anyplace else, transform this
489                                  * deletion request into a delete of the other object. (This
490                                  * will be an error condition iff RESTRICT mode.) In this case
491                                  * we finish deleting my dependencies except for the INTERNAL
492                                  * link, which will be needed to cause the owning object to
493                                  * recurse back to me.
494                                  */
495                                 if (amOwned)    /* shouldn't happen */
496                                         elog(ERROR, "multiple INTERNAL dependencies for %s",
497                                                  objDescription);
498                                 owningObject = otherObject;
499                                 amOwned = true;
500                                 /* "continue" bypasses the simple_heap_delete call below */
501                                 continue;
502                         case DEPENDENCY_PIN:
503
504                                 /*
505                                  * Should not happen; PIN dependencies should have zeroes in
506                                  * the depender fields...
507                                  */
508                                 elog(ERROR, "incorrect use of PIN dependency with %s",
509                                          objDescription);
510                                 break;
511                         default:
512                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
513                                          foundDep->deptype, objDescription);
514                                 break;
515                 }
516
517                 /* delete the pg_depend tuple */
518                 simple_heap_delete(depRel, &tup->t_self);
519         }
520
521         systable_endscan(scan);
522
523         /*
524          * CommandCounterIncrement here to ensure that preceding changes are all
525          * visible; in particular, that the above deletions of pg_depend entries
526          * are visible.  That prevents infinite recursion in case of a dependency
527          * loop (which is perfectly legal).
528          */
529         CommandCounterIncrement();
530
531         /*
532          * If we found we are owned by another object, ask it to delete itself
533          * instead of proceeding.  Complain if RESTRICT mode, unless the other
534          * object is in oktodelete.
535          */
536         if (amOwned)
537         {
538                 if (object_address_present(&owningObject, oktodelete))
539                         ereport(DEBUG2,
540                                         (errmsg("drop auto-cascades to %s",
541                                                         getObjectDescription(&owningObject))));
542                 else if (behavior == DROP_RESTRICT)
543                 {
544                         ereport(msglevel,
545                                         (errmsg("%s depends on %s",
546                                                         getObjectDescription(&owningObject),
547                                                         objDescription)));
548                         ok = false;
549                 }
550                 else
551                         ereport(msglevel,
552                                         (errmsg("drop cascades to %s",
553                                                         getObjectDescription(&owningObject))));
554
555                 if (!recursiveDeletion(&owningObject, behavior, msglevel,
556                                                            object, oktodelete, depRel))
557                         ok = false;
558
559                 pfree(objDescription);
560
561                 return ok;
562         }
563
564         /*
565          * Step 2: scan pg_depend records that link to this object, showing the
566          * things that depend on it.  Recursively delete those things. Note it's
567          * important to delete the dependent objects before the referenced one,
568          * since the deletion routines might do things like try to update the
569          * pg_class record when deleting a check constraint.
570          */
571         if (!deleteDependentObjects(object, objDescription,
572                                                                 behavior, msglevel,
573                                                                 oktodelete, depRel))
574                 ok = false;
575
576         /*
577          * We do not need CommandCounterIncrement here, since if step 2 did
578          * anything then each recursive call will have ended with one.
579          */
580
581         /*
582          * Step 3: delete the object itself.
583          */
584         doDeletion(object);
585
586         /*
587          * Delete any comments associated with this object.  (This is a convenient
588          * place to do it instead of having every object type know to do it.)
589          */
590         DeleteComments(object->objectId, object->classId, object->objectSubId);
591
592         /*
593          * Delete shared dependency references related to this object. Sub-objects
594          * (columns) don't have dependencies on global objects, so skip them.
595          */
596         if (object->objectSubId == 0)
597                 deleteSharedDependencyRecordsFor(object->classId, object->objectId);
598
599         /*
600          * CommandCounterIncrement here to ensure that preceding changes are all
601          * visible.
602          */
603         CommandCounterIncrement();
604
605         /*
606          * And we're done!
607          */
608         pfree(objDescription);
609
610         return ok;
611 }
612
613
614 /*
615  * deleteDependentObjects - find and delete objects that depend on 'object'
616  *
617  * Scan pg_depend records that link to the given object, showing
618  * the things that depend on it.  Recursively delete those things. (We
619  * don't delete the pg_depend records here, as the recursive call will
620  * do that.)  Note it's important to delete the dependent objects
621  * before the referenced one, since the deletion routines might do
622  * things like try to update the pg_class record when deleting a check
623  * constraint.
624  *
625  * When dropping a whole object (subId = 0), find pg_depend records for
626  * its sub-objects too.
627  *
628  *      object: the object to find dependencies on
629  *      objDescription: description of object (only used for error messages)
630  *      behavior: desired drop behavior
631  *      oktodelete: stuff that's AUTO-deletable
632  *      depRel: already opened pg_depend relation
633  *
634  * Returns TRUE if all is well, false if any problem found.
635  *
636  * NOTE: because we are using SnapshotNow, if a recursive call deletes
637  * any pg_depend tuples that our scan hasn't yet visited, we will not
638  * see them as good when we do visit them.      This is essential for
639  * correct behavior if there are multiple dependency paths between two
640  * objects --- else we might try to delete an already-deleted object.
641  */
642 static bool
643 deleteDependentObjects(const ObjectAddress *object,
644                                            const char *objDescription,
645                                            DropBehavior behavior,
646                                            int msglevel,
647                                            ObjectAddresses *oktodelete,
648                                            Relation depRel)
649 {
650         bool            ok = true;
651         ScanKeyData key[3];
652         int                     nkeys;
653         SysScanDesc scan;
654         HeapTuple       tup;
655         ObjectAddress otherObject;
656
657         ScanKeyInit(&key[0],
658                                 Anum_pg_depend_refclassid,
659                                 BTEqualStrategyNumber, F_OIDEQ,
660                                 ObjectIdGetDatum(object->classId));
661         ScanKeyInit(&key[1],
662                                 Anum_pg_depend_refobjid,
663                                 BTEqualStrategyNumber, F_OIDEQ,
664                                 ObjectIdGetDatum(object->objectId));
665         if (object->objectSubId != 0)
666         {
667                 ScanKeyInit(&key[2],
668                                         Anum_pg_depend_refobjsubid,
669                                         BTEqualStrategyNumber, F_INT4EQ,
670                                         Int32GetDatum(object->objectSubId));
671                 nkeys = 3;
672         }
673         else
674                 nkeys = 2;
675
676         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
677                                                           SnapshotNow, nkeys, key);
678
679         while (HeapTupleIsValid(tup = systable_getnext(scan)))
680         {
681                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
682
683                 otherObject.classId = foundDep->classid;
684                 otherObject.objectId = foundDep->objid;
685                 otherObject.objectSubId = foundDep->objsubid;
686
687                 switch (foundDep->deptype)
688                 {
689                         case DEPENDENCY_NORMAL:
690
691                                 /*
692                                  * Perhaps there was another dependency path that would have
693                                  * allowed silent deletion of the otherObject, had we only
694                                  * taken that path first. In that case, act like this link is
695                                  * AUTO, too.
696                                  */
697                                 if (object_address_present(&otherObject, oktodelete))
698                                         ereport(DEBUG2,
699                                                         (errmsg("drop auto-cascades to %s",
700                                                                         getObjectDescription(&otherObject))));
701                                 else if (behavior == DROP_RESTRICT)
702                                 {
703                                         ereport(msglevel,
704                                                         (errmsg("%s depends on %s",
705                                                                         getObjectDescription(&otherObject),
706                                                                         objDescription)));
707                                         ok = false;
708                                 }
709                                 else
710                                         ereport(msglevel,
711                                                         (errmsg("drop cascades to %s",
712                                                                         getObjectDescription(&otherObject))));
713
714                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
715                                                                            object, oktodelete, depRel))
716                                         ok = false;
717                                 break;
718                         case DEPENDENCY_AUTO:
719                         case DEPENDENCY_INTERNAL:
720
721                                 /*
722                                  * We propagate the DROP without complaint even in the
723                                  * RESTRICT case.  (However, normal dependencies on the
724                                  * component object could still cause failure.)
725                                  */
726                                 ereport(DEBUG2,
727                                                 (errmsg("drop auto-cascades to %s",
728                                                                 getObjectDescription(&otherObject))));
729
730                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
731                                                                            object, oktodelete, depRel))
732                                         ok = false;
733                                 break;
734                         case DEPENDENCY_PIN:
735
736                                 /*
737                                  * For a PIN dependency we just ereport immediately; there
738                                  * won't be any others to report.
739                                  */
740                                 ereport(ERROR,
741                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
742                                                  errmsg("cannot drop %s because it is required by the database system",
743                                                                 objDescription)));
744                                 break;
745                         default:
746                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
747                                          foundDep->deptype, objDescription);
748                                 break;
749                 }
750         }
751
752         systable_endscan(scan);
753
754         return ok;
755 }
756
757
758 /*
759  * doDeletion: actually delete a single object
760  */
761 static void
762 doDeletion(const ObjectAddress *object)
763 {
764         switch (getObjectClass(object))
765         {
766                 case OCLASS_CLASS:
767                         {
768                                 char            relKind = get_rel_relkind(object->objectId);
769
770                                 if (relKind == RELKIND_INDEX)
771                                 {
772                                         Assert(object->objectSubId == 0);
773                                         index_drop(object->objectId);
774                                 }
775                                 else
776                                 {
777                                         if (object->objectSubId != 0)
778                                                 RemoveAttributeById(object->objectId,
779                                                                                         object->objectSubId);
780                                         else
781                                                 heap_drop_with_catalog(object->objectId);
782                                 }
783                                 break;
784                         }
785
786                 case OCLASS_PROC:
787                         RemoveFunctionById(object->objectId);
788                         break;
789
790                 case OCLASS_TYPE:
791                         RemoveTypeById(object->objectId);
792                         break;
793
794                 case OCLASS_CAST:
795                         DropCastById(object->objectId);
796                         break;
797
798                 case OCLASS_CONSTRAINT:
799                         RemoveConstraintById(object->objectId);
800                         break;
801
802                 case OCLASS_CONVERSION:
803                         RemoveConversionById(object->objectId);
804                         break;
805
806                 case OCLASS_DEFAULT:
807                         RemoveAttrDefaultById(object->objectId);
808                         break;
809
810                 case OCLASS_LANGUAGE:
811                         DropProceduralLanguageById(object->objectId);
812                         break;
813
814                 case OCLASS_OPERATOR:
815                         RemoveOperatorById(object->objectId);
816                         break;
817
818                 case OCLASS_OPCLASS:
819                         RemoveOpClassById(object->objectId);
820                         break;
821
822                 case OCLASS_REWRITE:
823                         RemoveRewriteRuleById(object->objectId);
824                         break;
825
826                 case OCLASS_TRIGGER:
827                         RemoveTriggerById(object->objectId);
828                         break;
829
830                 case OCLASS_SCHEMA:
831                         RemoveSchemaById(object->objectId);
832                         break;
833
834                 default:
835                         elog(ERROR, "unrecognized object class: %u",
836                                  object->classId);
837         }
838 }
839
840 /*
841  * recordDependencyOnExpr - find expression dependencies
842  *
843  * This is used to find the dependencies of rules, constraint expressions,
844  * etc.
845  *
846  * Given an expression or query in node-tree form, find all the objects
847  * it refers to (tables, columns, operators, functions, etc).  Record
848  * a dependency of the specified type from the given depender object
849  * to each object mentioned in the expression.
850  *
851  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
852  * It can be NIL if no such variables are expected.
853  *
854  * XXX is it important to create dependencies on the datatypes mentioned in
855  * the expression?      In most cases this would be redundant (eg, a ref to an
856  * operator indirectly references its input and output datatypes), but I'm
857  * not quite convinced there are no cases where we need it.
858  */
859 void
860 recordDependencyOnExpr(const ObjectAddress *depender,
861                                            Node *expr, List *rtable,
862                                            DependencyType behavior)
863 {
864         find_expr_references_context context;
865
866         init_object_addresses(&context.addrs);
867
868         /* Set up interpretation for Vars at varlevelsup = 0 */
869         context.rtables = list_make1(rtable);
870
871         /* Scan the expression tree for referenceable objects */
872         find_expr_references_walker(expr, &context);
873
874         /* Remove any duplicates */
875         eliminate_duplicate_dependencies(&context.addrs);
876
877         /* And record 'em */
878         recordMultipleDependencies(depender,
879                                                            context.addrs.refs, context.addrs.numrefs,
880                                                            behavior);
881
882         term_object_addresses(&context.addrs);
883 }
884
885 /*
886  * recordDependencyOnSingleRelExpr - find expression dependencies
887  *
888  * As above, but only one relation is expected to be referenced (with
889  * varno = 1 and varlevelsup = 0).      Pass the relation OID instead of a
890  * range table.  An additional frammish is that dependencies on that
891  * relation (or its component columns) will be marked with 'self_behavior',
892  * whereas 'behavior' is used for everything else.
893  */
894 void
895 recordDependencyOnSingleRelExpr(const ObjectAddress *depender,
896                                                                 Node *expr, Oid relId,
897                                                                 DependencyType behavior,
898                                                                 DependencyType self_behavior)
899 {
900         find_expr_references_context context;
901         RangeTblEntry rte;
902
903         init_object_addresses(&context.addrs);
904
905         /* We gin up a rather bogus rangetable list to handle Vars */
906         MemSet(&rte, 0, sizeof(rte));
907         rte.type = T_RangeTblEntry;
908         rte.rtekind = RTE_RELATION;
909         rte.relid = relId;
910
911         context.rtables = list_make1(list_make1(&rte));
912
913         /* Scan the expression tree for referenceable objects */
914         find_expr_references_walker(expr, &context);
915
916         /* Remove any duplicates */
917         eliminate_duplicate_dependencies(&context.addrs);
918
919         /* Separate self-dependencies if necessary */
920         if (behavior != self_behavior && context.addrs.numrefs > 0)
921         {
922                 ObjectAddresses self_addrs;
923                 ObjectAddress *outobj;
924                 int                     oldref,
925                                         outrefs;
926
927                 init_object_addresses(&self_addrs);
928
929                 outobj = context.addrs.refs;
930                 outrefs = 0;
931                 for (oldref = 0; oldref < context.addrs.numrefs; oldref++)
932                 {
933                         ObjectAddress *thisobj = context.addrs.refs + oldref;
934
935                         if (thisobj->classId == RelationRelationId &&
936                                 thisobj->objectId == relId)
937                         {
938                                 /* Move this ref into self_addrs */
939                                 add_object_address(OCLASS_CLASS, relId, thisobj->objectSubId,
940                                                                    &self_addrs);
941                         }
942                         else
943                         {
944                                 /* Keep it in context.addrs */
945                                 outobj->classId = thisobj->classId;
946                                 outobj->objectId = thisobj->objectId;
947                                 outobj->objectSubId = thisobj->objectSubId;
948                                 outobj++;
949                                 outrefs++;
950                         }
951                 }
952                 context.addrs.numrefs = outrefs;
953
954                 /* Record the self-dependencies */
955                 recordMultipleDependencies(depender,
956                                                                    self_addrs.refs, self_addrs.numrefs,
957                                                                    self_behavior);
958
959                 term_object_addresses(&self_addrs);
960         }
961
962         /* Record the external dependencies */
963         recordMultipleDependencies(depender,
964                                                            context.addrs.refs, context.addrs.numrefs,
965                                                            behavior);
966
967         term_object_addresses(&context.addrs);
968 }
969
970 /*
971  * Recursively search an expression tree for object references.
972  *
973  * Note: we avoid creating references to columns of tables that participate
974  * in an SQL JOIN construct, but are not actually used anywhere in the query.
975  * To do so, we do not scan the joinaliasvars list of a join RTE while
976  * scanning the query rangetable, but instead scan each individual entry
977  * of the alias list when we find a reference to it.
978  */
979 static bool
980 find_expr_references_walker(Node *node,
981                                                         find_expr_references_context *context)
982 {
983         if (node == NULL)
984                 return false;
985         if (IsA(node, Var))
986         {
987                 Var                *var = (Var *) node;
988                 List       *rtable;
989                 RangeTblEntry *rte;
990
991                 /* Find matching rtable entry, or complain if not found */
992                 if (var->varlevelsup >= list_length(context->rtables))
993                         elog(ERROR, "invalid varlevelsup %d", var->varlevelsup);
994                 rtable = (List *) list_nth(context->rtables, var->varlevelsup);
995                 if (var->varno <= 0 || var->varno > list_length(rtable))
996                         elog(ERROR, "invalid varno %d", var->varno);
997                 rte = rt_fetch(var->varno, rtable);
998
999                 /*
1000                  * A whole-row Var references no specific columns, so adds no new
1001                  * dependency.
1002                  */
1003                 if (var->varattno == InvalidAttrNumber)
1004                         return false;
1005                 if (rte->rtekind == RTE_RELATION)
1006                 {
1007                         /* If it's a plain relation, reference this column */
1008                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
1009                                                            &context->addrs);
1010                 }
1011                 else if (rte->rtekind == RTE_JOIN)
1012                 {
1013                         /* Scan join output column to add references to join inputs */
1014                         List       *save_rtables;
1015
1016                         /* We must make the context appropriate for join's level */
1017                         save_rtables = context->rtables;
1018                         context->rtables = list_copy_tail(context->rtables,
1019                                                                                           var->varlevelsup);
1020                         if (var->varattno <= 0 ||
1021                                 var->varattno > list_length(rte->joinaliasvars))
1022                                 elog(ERROR, "invalid varattno %d", var->varattno);
1023                         find_expr_references_walker((Node *) list_nth(rte->joinaliasvars,
1024                                                                                                                   var->varattno - 1),
1025                                                                                 context);
1026                         list_free(context->rtables);
1027                         context->rtables = save_rtables;
1028                 }
1029                 return false;
1030         }
1031         if (IsA(node, Const))
1032         {
1033                 Const      *con = (Const *) node;
1034                 Oid                     objoid;
1035
1036                 /*
1037                  * If it's a regclass or similar literal referring to an existing
1038                  * object, add a reference to that object.      (Currently, only the
1039                  * regclass case has any likely use, but we may as well handle all the
1040                  * OID-alias datatypes consistently.)
1041                  */
1042                 if (!con->constisnull)
1043                 {
1044                         switch (con->consttype)
1045                         {
1046                                 case REGPROCOID:
1047                                 case REGPROCEDUREOID:
1048                                         objoid = DatumGetObjectId(con->constvalue);
1049                                         if (SearchSysCacheExists(PROCOID,
1050                                                                                          ObjectIdGetDatum(objoid),
1051                                                                                          0, 0, 0))
1052                                                 add_object_address(OCLASS_PROC, objoid, 0,
1053                                                                                    &context->addrs);
1054                                         break;
1055                                 case REGOPEROID:
1056                                 case REGOPERATOROID:
1057                                         objoid = DatumGetObjectId(con->constvalue);
1058                                         if (SearchSysCacheExists(OPEROID,
1059                                                                                          ObjectIdGetDatum(objoid),
1060                                                                                          0, 0, 0))
1061                                                 add_object_address(OCLASS_OPERATOR, objoid, 0,
1062                                                                                    &context->addrs);
1063                                         break;
1064                                 case REGCLASSOID:
1065                                         objoid = DatumGetObjectId(con->constvalue);
1066                                         if (SearchSysCacheExists(RELOID,
1067                                                                                          ObjectIdGetDatum(objoid),
1068                                                                                          0, 0, 0))
1069                                                 add_object_address(OCLASS_CLASS, objoid, 0,
1070                                                                                    &context->addrs);
1071                                         break;
1072                                 case REGTYPEOID:
1073                                         objoid = DatumGetObjectId(con->constvalue);
1074                                         if (SearchSysCacheExists(TYPEOID,
1075                                                                                          ObjectIdGetDatum(objoid),
1076                                                                                          0, 0, 0))
1077                                                 add_object_address(OCLASS_TYPE, objoid, 0,
1078                                                                                    &context->addrs);
1079                                         break;
1080                         }
1081                 }
1082                 return false;
1083         }
1084         if (IsA(node, FuncExpr))
1085         {
1086                 FuncExpr   *funcexpr = (FuncExpr *) node;
1087
1088                 add_object_address(OCLASS_PROC, funcexpr->funcid, 0,
1089                                                    &context->addrs);
1090                 /* fall through to examine arguments */
1091         }
1092         if (IsA(node, OpExpr))
1093         {
1094                 OpExpr     *opexpr = (OpExpr *) node;
1095
1096                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1097                                                    &context->addrs);
1098                 /* fall through to examine arguments */
1099         }
1100         if (IsA(node, DistinctExpr))
1101         {
1102                 DistinctExpr *distinctexpr = (DistinctExpr *) node;
1103
1104                 add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0,
1105                                                    &context->addrs);
1106                 /* fall through to examine arguments */
1107         }
1108         if (IsA(node, ScalarArrayOpExpr))
1109         {
1110                 ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;
1111
1112                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1113                                                    &context->addrs);
1114                 /* fall through to examine arguments */
1115         }
1116         if (IsA(node, NullIfExpr))
1117         {
1118                 NullIfExpr *nullifexpr = (NullIfExpr *) node;
1119
1120                 add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0,
1121                                                    &context->addrs);
1122                 /* fall through to examine arguments */
1123         }
1124         if (IsA(node, Aggref))
1125         {
1126                 Aggref     *aggref = (Aggref *) node;
1127
1128                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
1129                                                    &context->addrs);
1130                 /* fall through to examine arguments */
1131         }
1132         if (is_subplan(node))
1133         {
1134                 /* Extra work needed here if we ever need this case */
1135                 elog(ERROR, "already-planned subqueries not supported");
1136         }
1137         if (IsA(node, RowCompareExpr))
1138         {
1139                 RowCompareExpr *rcexpr = (RowCompareExpr *) node;
1140                 ListCell   *l;
1141
1142                 foreach(l, rcexpr->opnos)
1143                 {
1144                         add_object_address(OCLASS_OPERATOR, lfirst_oid(l), 0,
1145                                                            &context->addrs);
1146                 }
1147                 foreach(l, rcexpr->opclasses)
1148                 {
1149                         add_object_address(OCLASS_OPCLASS, lfirst_oid(l), 0,
1150                                                            &context->addrs);
1151                 }
1152                 /* fall through to examine arguments */
1153         }
1154         if (IsA(node, Query))
1155         {
1156                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
1157                 Query      *query = (Query *) node;
1158                 ListCell   *rtable;
1159                 bool            result;
1160
1161                 /*
1162                  * Add whole-relation refs for each plain relation mentioned in the
1163                  * subquery's rtable.  (Note: query_tree_walker takes care of
1164                  * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need to do
1165                  * that here.  But keep it from looking at join alias lists.)
1166                  */
1167                 foreach(rtable, query->rtable)
1168                 {
1169                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
1170
1171                         if (rte->rtekind == RTE_RELATION)
1172                                 add_object_address(OCLASS_CLASS, rte->relid, 0,
1173                                                                    &context->addrs);
1174                 }
1175
1176                 /* Examine substructure of query */
1177                 context->rtables = lcons(query->rtable, context->rtables);
1178                 result = query_tree_walker(query,
1179                                                                    find_expr_references_walker,
1180                                                                    (void *) context,
1181                                                                    QTW_IGNORE_JOINALIASES);
1182                 context->rtables = list_delete_first(context->rtables);
1183                 return result;
1184         }
1185         return expression_tree_walker(node, find_expr_references_walker,
1186                                                                   (void *) context);
1187 }
1188
1189 /*
1190  * Given an array of dependency references, eliminate any duplicates.
1191  */
1192 static void
1193 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
1194 {
1195         ObjectAddress *priorobj;
1196         int                     oldref,
1197                                 newrefs;
1198
1199         if (addrs->numrefs <= 1)
1200                 return;                                 /* nothing to do */
1201
1202         /* Sort the refs so that duplicates are adjacent */
1203         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
1204                   object_address_comparator);
1205
1206         /* Remove dups */
1207         priorobj = addrs->refs;
1208         newrefs = 1;
1209         for (oldref = 1; oldref < addrs->numrefs; oldref++)
1210         {
1211                 ObjectAddress *thisobj = addrs->refs + oldref;
1212
1213                 if (priorobj->classId == thisobj->classId &&
1214                         priorobj->objectId == thisobj->objectId)
1215                 {
1216                         if (priorobj->objectSubId == thisobj->objectSubId)
1217                                 continue;               /* identical, so drop thisobj */
1218
1219                         /*
1220                          * If we have a whole-object reference and a reference to a part
1221                          * of the same object, we don't need the whole-object reference
1222                          * (for example, we don't need to reference both table foo and
1223                          * column foo.bar).  The whole-object reference will always appear
1224                          * first in the sorted list.
1225                          */
1226                         if (priorobj->objectSubId == 0)
1227                         {
1228                                 /* replace whole ref with partial */
1229                                 priorobj->objectSubId = thisobj->objectSubId;
1230                                 continue;
1231                         }
1232                 }
1233                 /* Not identical, so add thisobj to output set */
1234                 priorobj++;
1235                 priorobj->classId = thisobj->classId;
1236                 priorobj->objectId = thisobj->objectId;
1237                 priorobj->objectSubId = thisobj->objectSubId;
1238                 newrefs++;
1239         }
1240
1241         addrs->numrefs = newrefs;
1242 }
1243
1244 /*
1245  * qsort comparator for ObjectAddress items
1246  */
1247 static int
1248 object_address_comparator(const void *a, const void *b)
1249 {
1250         const ObjectAddress *obja = (const ObjectAddress *) a;
1251         const ObjectAddress *objb = (const ObjectAddress *) b;
1252
1253         if (obja->classId < objb->classId)
1254                 return -1;
1255         if (obja->classId > objb->classId)
1256                 return 1;
1257         if (obja->objectId < objb->objectId)
1258                 return -1;
1259         if (obja->objectId > objb->objectId)
1260                 return 1;
1261
1262         /*
1263          * We sort the subId as an unsigned int so that 0 will come first. See
1264          * logic in eliminate_duplicate_dependencies.
1265          */
1266         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
1267                 return -1;
1268         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
1269                 return 1;
1270         return 0;
1271 }
1272
1273 /*
1274  * Routines for handling an expansible array of ObjectAddress items.
1275  *
1276  * init_object_addresses: initialize an ObjectAddresses array.
1277  */
1278 static void
1279 init_object_addresses(ObjectAddresses *addrs)
1280 {
1281         /* Initialize array to empty */
1282         addrs->numrefs = 0;
1283         addrs->maxrefs = 32;            /* arbitrary initial array size */
1284         addrs->refs = (ObjectAddress *)
1285                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
1286 }
1287
1288 /*
1289  * Add an entry to an ObjectAddresses array.
1290  *
1291  * It is convenient to specify the class by ObjectClass rather than directly
1292  * by catalog OID.
1293  */
1294 static void
1295 add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
1296                                    ObjectAddresses *addrs)
1297 {
1298         ObjectAddress *item;
1299
1300         /* enlarge array if needed */
1301         if (addrs->numrefs >= addrs->maxrefs)
1302         {
1303                 addrs->maxrefs *= 2;
1304                 addrs->refs = (ObjectAddress *)
1305                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1306         }
1307         /* record this item */
1308         item = addrs->refs + addrs->numrefs;
1309         item->classId = object_classes[oclass];
1310         item->objectId = objectId;
1311         item->objectSubId = subId;
1312         addrs->numrefs++;
1313 }
1314
1315 /*
1316  * Add an entry to an ObjectAddresses array.
1317  *
1318  * As above, but specify entry exactly.
1319  */
1320 static void
1321 add_exact_object_address(const ObjectAddress *object,
1322                                                  ObjectAddresses *addrs)
1323 {
1324         ObjectAddress *item;
1325
1326         /* enlarge array if needed */
1327         if (addrs->numrefs >= addrs->maxrefs)
1328         {
1329                 addrs->maxrefs *= 2;
1330                 addrs->refs = (ObjectAddress *)
1331                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1332         }
1333         /* record this item */
1334         item = addrs->refs + addrs->numrefs;
1335         *item = *object;
1336         addrs->numrefs++;
1337 }
1338
1339 /*
1340  * Test whether an object is present in an ObjectAddresses array.
1341  *
1342  * We return "true" if object is a subobject of something in the array, too.
1343  */
1344 static bool
1345 object_address_present(const ObjectAddress *object,
1346                                            ObjectAddresses *addrs)
1347 {
1348         int                     i;
1349
1350         for (i = addrs->numrefs - 1; i >= 0; i--)
1351         {
1352                 ObjectAddress *thisobj = addrs->refs + i;
1353
1354                 if (object->classId == thisobj->classId &&
1355                         object->objectId == thisobj->objectId)
1356                 {
1357                         if (object->objectSubId == thisobj->objectSubId ||
1358                                 thisobj->objectSubId == 0)
1359                                 return true;
1360                 }
1361         }
1362
1363         return false;
1364 }
1365
1366 /*
1367  * Clean up when done with an ObjectAddresses array.
1368  */
1369 static void
1370 term_object_addresses(ObjectAddresses *addrs)
1371 {
1372         pfree(addrs->refs);
1373 }
1374
1375 /*
1376  * Determine the class of a given object identified by objectAddress.
1377  *
1378  * This function is essentially the reverse mapping for the object_classes[]
1379  * table.  We implement it as a function because the OIDs aren't consecutive.
1380  */
1381 ObjectClass
1382 getObjectClass(const ObjectAddress *object)
1383 {
1384         switch (object->classId)
1385         {
1386                 case RelationRelationId:
1387                         /* caller must check objectSubId */
1388                         return OCLASS_CLASS;
1389
1390                 case ProcedureRelationId:
1391                         Assert(object->objectSubId == 0);
1392                         return OCLASS_PROC;
1393
1394                 case TypeRelationId:
1395                         Assert(object->objectSubId == 0);
1396                         return OCLASS_TYPE;
1397
1398                 case CastRelationId:
1399                         Assert(object->objectSubId == 0);
1400                         return OCLASS_CAST;
1401
1402                 case ConstraintRelationId:
1403                         Assert(object->objectSubId == 0);
1404                         return OCLASS_CONSTRAINT;
1405
1406                 case ConversionRelationId:
1407                         Assert(object->objectSubId == 0);
1408                         return OCLASS_CONVERSION;
1409
1410                 case AttrDefaultRelationId:
1411                         Assert(object->objectSubId == 0);
1412                         return OCLASS_DEFAULT;
1413
1414                 case LanguageRelationId:
1415                         Assert(object->objectSubId == 0);
1416                         return OCLASS_LANGUAGE;
1417
1418                 case OperatorRelationId:
1419                         Assert(object->objectSubId == 0);
1420                         return OCLASS_OPERATOR;
1421
1422                 case OperatorClassRelationId:
1423                         Assert(object->objectSubId == 0);
1424                         return OCLASS_OPCLASS;
1425
1426                 case RewriteRelationId:
1427                         Assert(object->objectSubId == 0);
1428                         return OCLASS_REWRITE;
1429
1430                 case TriggerRelationId:
1431                         Assert(object->objectSubId == 0);
1432                         return OCLASS_TRIGGER;
1433
1434                 case NamespaceRelationId:
1435                         Assert(object->objectSubId == 0);
1436                         return OCLASS_SCHEMA;
1437
1438                 case AuthIdRelationId:
1439                         Assert(object->objectSubId == 0);
1440                         return OCLASS_ROLE;
1441
1442                 case DatabaseRelationId:
1443                         Assert(object->objectSubId == 0);
1444                         return OCLASS_DATABASE;
1445
1446                 case TableSpaceRelationId:
1447                         Assert(object->objectSubId == 0);
1448                         return OCLASS_TBLSPACE;
1449         }
1450
1451         /* shouldn't get here */
1452         elog(ERROR, "unrecognized object class: %u", object->classId);
1453         return OCLASS_CLASS;            /* keep compiler quiet */
1454 }
1455
1456 /*
1457  * getObjectDescription: build an object description for messages
1458  *
1459  * The result is a palloc'd string.
1460  */
1461 char *
1462 getObjectDescription(const ObjectAddress *object)
1463 {
1464         StringInfoData buffer;
1465
1466         initStringInfo(&buffer);
1467
1468         switch (getObjectClass(object))
1469         {
1470                 case OCLASS_CLASS:
1471                         getRelationDescription(&buffer, object->objectId);
1472                         if (object->objectSubId != 0)
1473                                 appendStringInfo(&buffer, _(" column %s"),
1474                                                                  get_relid_attribute_name(object->objectId,
1475                                                                                                            object->objectSubId));
1476                         break;
1477
1478                 case OCLASS_PROC:
1479                         appendStringInfo(&buffer, _("function %s"),
1480                                                          format_procedure(object->objectId));
1481                         break;
1482
1483                 case OCLASS_TYPE:
1484                         appendStringInfo(&buffer, _("type %s"),
1485                                                          format_type_be(object->objectId));
1486                         break;
1487
1488                 case OCLASS_CAST:
1489                         {
1490                                 Relation        castDesc;
1491                                 ScanKeyData skey[1];
1492                                 SysScanDesc rcscan;
1493                                 HeapTuple       tup;
1494                                 Form_pg_cast castForm;
1495
1496                                 castDesc = heap_open(CastRelationId, AccessShareLock);
1497
1498                                 ScanKeyInit(&skey[0],
1499                                                         ObjectIdAttributeNumber,
1500                                                         BTEqualStrategyNumber, F_OIDEQ,
1501                                                         ObjectIdGetDatum(object->objectId));
1502
1503                                 rcscan = systable_beginscan(castDesc, CastOidIndexId, true,
1504                                                                                         SnapshotNow, 1, skey);
1505
1506                                 tup = systable_getnext(rcscan);
1507
1508                                 if (!HeapTupleIsValid(tup))
1509                                         elog(ERROR, "could not find tuple for cast %u",
1510                                                  object->objectId);
1511
1512                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
1513
1514                                 appendStringInfo(&buffer, _("cast from %s to %s"),
1515                                                                  format_type_be(castForm->castsource),
1516                                                                  format_type_be(castForm->casttarget));
1517
1518                                 systable_endscan(rcscan);
1519                                 heap_close(castDesc, AccessShareLock);
1520                                 break;
1521                         }
1522
1523                 case OCLASS_CONSTRAINT:
1524                         {
1525                                 Relation        conDesc;
1526                                 ScanKeyData skey[1];
1527                                 SysScanDesc rcscan;
1528                                 HeapTuple       tup;
1529                                 Form_pg_constraint con;
1530
1531                                 conDesc = heap_open(ConstraintRelationId, AccessShareLock);
1532
1533                                 ScanKeyInit(&skey[0],
1534                                                         ObjectIdAttributeNumber,
1535                                                         BTEqualStrategyNumber, F_OIDEQ,
1536                                                         ObjectIdGetDatum(object->objectId));
1537
1538                                 rcscan = systable_beginscan(conDesc, ConstraintOidIndexId, true,
1539                                                                                         SnapshotNow, 1, skey);
1540
1541                                 tup = systable_getnext(rcscan);
1542
1543                                 if (!HeapTupleIsValid(tup))
1544                                         elog(ERROR, "could not find tuple for constraint %u",
1545                                                  object->objectId);
1546
1547                                 con = (Form_pg_constraint) GETSTRUCT(tup);
1548
1549                                 if (OidIsValid(con->conrelid))
1550                                 {
1551                                         appendStringInfo(&buffer, _("constraint %s on "),
1552                                                                          NameStr(con->conname));
1553                                         getRelationDescription(&buffer, con->conrelid);
1554                                 }
1555                                 else
1556                                 {
1557                                         appendStringInfo(&buffer, _("constraint %s"),
1558                                                                          NameStr(con->conname));
1559                                 }
1560
1561                                 systable_endscan(rcscan);
1562                                 heap_close(conDesc, AccessShareLock);
1563                                 break;
1564                         }
1565
1566                 case OCLASS_CONVERSION:
1567                         {
1568                                 HeapTuple       conTup;
1569
1570                                 conTup = SearchSysCache(CONOID,
1571                                                                                 ObjectIdGetDatum(object->objectId),
1572                                                                                 0, 0, 0);
1573                                 if (!HeapTupleIsValid(conTup))
1574                                         elog(ERROR, "cache lookup failed for conversion %u",
1575                                                  object->objectId);
1576                                 appendStringInfo(&buffer, _("conversion %s"),
1577                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
1578                                 ReleaseSysCache(conTup);
1579                                 break;
1580                         }
1581
1582                 case OCLASS_DEFAULT:
1583                         {
1584                                 Relation        attrdefDesc;
1585                                 ScanKeyData skey[1];
1586                                 SysScanDesc adscan;
1587                                 HeapTuple       tup;
1588                                 Form_pg_attrdef attrdef;
1589                                 ObjectAddress colobject;
1590
1591                                 attrdefDesc = heap_open(AttrDefaultRelationId, AccessShareLock);
1592
1593                                 ScanKeyInit(&skey[0],
1594                                                         ObjectIdAttributeNumber,
1595                                                         BTEqualStrategyNumber, F_OIDEQ,
1596                                                         ObjectIdGetDatum(object->objectId));
1597
1598                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndexId,
1599                                                                                         true, SnapshotNow, 1, skey);
1600
1601                                 tup = systable_getnext(adscan);
1602
1603                                 if (!HeapTupleIsValid(tup))
1604                                         elog(ERROR, "could not find tuple for attrdef %u",
1605                                                  object->objectId);
1606
1607                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
1608
1609                                 colobject.classId = RelationRelationId;
1610                                 colobject.objectId = attrdef->adrelid;
1611                                 colobject.objectSubId = attrdef->adnum;
1612
1613                                 appendStringInfo(&buffer, _("default for %s"),
1614                                                                  getObjectDescription(&colobject));
1615
1616                                 systable_endscan(adscan);
1617                                 heap_close(attrdefDesc, AccessShareLock);
1618                                 break;
1619                         }
1620
1621                 case OCLASS_LANGUAGE:
1622                         {
1623                                 HeapTuple       langTup;
1624
1625                                 langTup = SearchSysCache(LANGOID,
1626                                                                                  ObjectIdGetDatum(object->objectId),
1627                                                                                  0, 0, 0);
1628                                 if (!HeapTupleIsValid(langTup))
1629                                         elog(ERROR, "cache lookup failed for language %u",
1630                                                  object->objectId);
1631                                 appendStringInfo(&buffer, _("language %s"),
1632                                   NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
1633                                 ReleaseSysCache(langTup);
1634                                 break;
1635                         }
1636
1637                 case OCLASS_OPERATOR:
1638                         appendStringInfo(&buffer, _("operator %s"),
1639                                                          format_operator(object->objectId));
1640                         break;
1641
1642                 case OCLASS_OPCLASS:
1643                         {
1644                                 HeapTuple       opcTup;
1645                                 Form_pg_opclass opcForm;
1646                                 HeapTuple       amTup;
1647                                 Form_pg_am      amForm;
1648                                 char       *nspname;
1649
1650                                 opcTup = SearchSysCache(CLAOID,
1651                                                                                 ObjectIdGetDatum(object->objectId),
1652                                                                                 0, 0, 0);
1653                                 if (!HeapTupleIsValid(opcTup))
1654                                         elog(ERROR, "cache lookup failed for opclass %u",
1655                                                  object->objectId);
1656                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
1657
1658                                 amTup = SearchSysCache(AMOID,
1659                                                                            ObjectIdGetDatum(opcForm->opcamid),
1660                                                                            0, 0, 0);
1661                                 if (!HeapTupleIsValid(amTup))
1662                                         elog(ERROR, "cache lookup failed for access method %u",
1663                                                  opcForm->opcamid);
1664                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
1665
1666                                 /* Qualify the name if not visible in search path */
1667                                 if (OpclassIsVisible(object->objectId))
1668                                         nspname = NULL;
1669                                 else
1670                                         nspname = get_namespace_name(opcForm->opcnamespace);
1671
1672                                 appendStringInfo(&buffer, _("operator class %s for access method %s"),
1673                                                                  quote_qualified_identifier(nspname,
1674                                                                                                   NameStr(opcForm->opcname)),
1675                                                                  NameStr(amForm->amname));
1676
1677                                 ReleaseSysCache(amTup);
1678                                 ReleaseSysCache(opcTup);
1679                                 break;
1680                         }
1681
1682                 case OCLASS_REWRITE:
1683                         {
1684                                 Relation        ruleDesc;
1685                                 ScanKeyData skey[1];
1686                                 SysScanDesc rcscan;
1687                                 HeapTuple       tup;
1688                                 Form_pg_rewrite rule;
1689
1690                                 ruleDesc = heap_open(RewriteRelationId, AccessShareLock);
1691
1692                                 ScanKeyInit(&skey[0],
1693                                                         ObjectIdAttributeNumber,
1694                                                         BTEqualStrategyNumber, F_OIDEQ,
1695                                                         ObjectIdGetDatum(object->objectId));
1696
1697                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndexId, true,
1698                                                                                         SnapshotNow, 1, skey);
1699
1700                                 tup = systable_getnext(rcscan);
1701
1702                                 if (!HeapTupleIsValid(tup))
1703                                         elog(ERROR, "could not find tuple for rule %u",
1704                                                  object->objectId);
1705
1706                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
1707
1708                                 appendStringInfo(&buffer, _("rule %s on "),
1709                                                                  NameStr(rule->rulename));
1710                                 getRelationDescription(&buffer, rule->ev_class);
1711
1712                                 systable_endscan(rcscan);
1713                                 heap_close(ruleDesc, AccessShareLock);
1714                                 break;
1715                         }
1716
1717                 case OCLASS_TRIGGER:
1718                         {
1719                                 Relation        trigDesc;
1720                                 ScanKeyData skey[1];
1721                                 SysScanDesc tgscan;
1722                                 HeapTuple       tup;
1723                                 Form_pg_trigger trig;
1724
1725                                 trigDesc = heap_open(TriggerRelationId, AccessShareLock);
1726
1727                                 ScanKeyInit(&skey[0],
1728                                                         ObjectIdAttributeNumber,
1729                                                         BTEqualStrategyNumber, F_OIDEQ,
1730                                                         ObjectIdGetDatum(object->objectId));
1731
1732                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndexId, true,
1733                                                                                         SnapshotNow, 1, skey);
1734
1735                                 tup = systable_getnext(tgscan);
1736
1737                                 if (!HeapTupleIsValid(tup))
1738                                         elog(ERROR, "could not find tuple for trigger %u",
1739                                                  object->objectId);
1740
1741                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
1742
1743                                 appendStringInfo(&buffer, _("trigger %s on "),
1744                                                                  NameStr(trig->tgname));
1745                                 getRelationDescription(&buffer, trig->tgrelid);
1746
1747                                 systable_endscan(tgscan);
1748                                 heap_close(trigDesc, AccessShareLock);
1749                                 break;
1750                         }
1751
1752                 case OCLASS_SCHEMA:
1753                         {
1754                                 char       *nspname;
1755
1756                                 nspname = get_namespace_name(object->objectId);
1757                                 if (!nspname)
1758                                         elog(ERROR, "cache lookup failed for namespace %u",
1759                                                  object->objectId);
1760                                 appendStringInfo(&buffer, _("schema %s"), nspname);
1761                                 break;
1762                         }
1763
1764                 case OCLASS_ROLE:
1765                         {
1766                                 appendStringInfo(&buffer, _("role %s"),
1767                                                                  GetUserNameFromId(object->objectId));
1768                                 break;
1769                         }
1770
1771                 case OCLASS_DATABASE:
1772                         {
1773                                 char       *datname;
1774
1775                                 datname = get_database_name(object->objectId);
1776                                 if (!datname)
1777                                         elog(ERROR, "cache lookup failed for database %u",
1778                                                  object->objectId);
1779                                 appendStringInfo(&buffer, _("database %s"), datname);
1780                                 break;
1781                         }
1782
1783                 case OCLASS_TBLSPACE:
1784                         {
1785                                 char       *tblspace;
1786
1787                                 tblspace = get_tablespace_name(object->objectId);
1788                                 if (!tblspace)
1789                                         elog(ERROR, "cache lookup failed for tablespace %u",
1790                                                  object->objectId);
1791                                 appendStringInfo(&buffer, _("tablespace %s"), tblspace);
1792                                 break;
1793                         }
1794
1795                 default:
1796                         appendStringInfo(&buffer, "unrecognized object %u %u %d",
1797                                                          object->classId,
1798                                                          object->objectId,
1799                                                          object->objectSubId);
1800                         break;
1801         }
1802
1803         return buffer.data;
1804 }
1805
1806 /*
1807  * subroutine for getObjectDescription: describe a relation
1808  */
1809 static void
1810 getRelationDescription(StringInfo buffer, Oid relid)
1811 {
1812         HeapTuple       relTup;
1813         Form_pg_class relForm;
1814         char       *nspname;
1815         char       *relname;
1816
1817         relTup = SearchSysCache(RELOID,
1818                                                         ObjectIdGetDatum(relid),
1819                                                         0, 0, 0);
1820         if (!HeapTupleIsValid(relTup))
1821                 elog(ERROR, "cache lookup failed for relation %u", relid);
1822         relForm = (Form_pg_class) GETSTRUCT(relTup);
1823
1824         /* Qualify the name if not visible in search path */
1825         if (RelationIsVisible(relid))
1826                 nspname = NULL;
1827         else
1828                 nspname = get_namespace_name(relForm->relnamespace);
1829
1830         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
1831
1832         switch (relForm->relkind)
1833         {
1834                 case RELKIND_RELATION:
1835                         appendStringInfo(buffer, _("table %s"),
1836                                                          relname);
1837                         break;
1838                 case RELKIND_INDEX:
1839                         appendStringInfo(buffer, _("index %s"),
1840                                                          relname);
1841                         break;
1842                 case RELKIND_SPECIAL:
1843                         appendStringInfo(buffer, _("special system relation %s"),
1844                                                          relname);
1845                         break;
1846                 case RELKIND_SEQUENCE:
1847                         appendStringInfo(buffer, _("sequence %s"),
1848                                                          relname);
1849                         break;
1850                 case RELKIND_UNCATALOGED:
1851                         appendStringInfo(buffer, _("uncataloged table %s"),
1852                                                          relname);
1853                         break;
1854                 case RELKIND_TOASTVALUE:
1855                         appendStringInfo(buffer, _("toast table %s"),
1856                                                          relname);
1857                         break;
1858                 case RELKIND_VIEW:
1859                         appendStringInfo(buffer, _("view %s"),
1860                                                          relname);
1861                         break;
1862                 case RELKIND_COMPOSITE_TYPE:
1863                         appendStringInfo(buffer, _("composite type %s"),
1864                                                          relname);
1865                         break;
1866                 default:
1867                         /* shouldn't get here */
1868                         appendStringInfo(buffer, _("relation %s"),
1869                                                          relname);
1870                         break;
1871         }
1872
1873         ReleaseSysCache(relTup);
1874 }