]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
Rewriter and planner should use only resno, not resname, to identify
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.31 2003/08/11 23:04:49 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_attrdef.h"
26 #include "catalog/pg_cast.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_conversion.h"
29 #include "catalog/pg_depend.h"
30 #include "catalog/pg_language.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_rewrite.h"
33 #include "catalog/pg_trigger.h"
34 #include "commands/comment.h"
35 #include "commands/defrem.h"
36 #include "commands/proclang.h"
37 #include "commands/schemacmds.h"
38 #include "commands/trigger.h"
39 #include "commands/typecmds.h"
40 #include "lib/stringinfo.h"
41 #include "miscadmin.h"
42 #include "optimizer/clauses.h"
43 #include "parser/parsetree.h"
44 #include "rewrite/rewriteRemove.h"
45 #include "utils/builtins.h"
46 #include "utils/fmgroids.h"
47 #include "utils/lsyscache.h"
48 #include "utils/syscache.h"
49
50
51 /* This enum covers all system catalogs whose OIDs can appear in classid. */
52 typedef enum ObjectClasses
53 {
54         OCLASS_CLASS,                           /* pg_class */
55         OCLASS_PROC,                            /* pg_proc */
56         OCLASS_TYPE,                            /* pg_type */
57         OCLASS_CAST,                            /* pg_cast */
58         OCLASS_CONSTRAINT,                      /* pg_constraint */
59         OCLASS_CONVERSION,                      /* pg_conversion */
60         OCLASS_DEFAULT,                         /* pg_attrdef */
61         OCLASS_LANGUAGE,                        /* pg_language */
62         OCLASS_OPERATOR,                        /* pg_operator */
63         OCLASS_OPCLASS,                         /* pg_opclass */
64         OCLASS_REWRITE,                         /* pg_rewrite */
65         OCLASS_TRIGGER,                         /* pg_trigger */
66         OCLASS_SCHEMA,                          /* pg_namespace */
67         MAX_OCLASS                                      /* MUST BE LAST */
68 } ObjectClasses;
69
70 /* expansible list of ObjectAddresses */
71 typedef struct
72 {
73         ObjectAddress *refs;            /* => palloc'd array */
74         int                     numrefs;                /* current number of references */
75         int                     maxrefs;                /* current size of palloc'd array */
76 } ObjectAddresses;
77
78 /* for find_expr_references_walker */
79 typedef struct
80 {
81         ObjectAddresses addrs;          /* addresses being accumulated */
82         List       *rtables;            /* list of rangetables to resolve Vars */
83 } find_expr_references_context;
84
85
86 /*
87  * Because not all system catalogs have predetermined OIDs, we build a table
88  * mapping between ObjectClasses and OIDs.      This is done at most once per
89  * backend run, to minimize lookup overhead.
90  */
91 static bool object_classes_initialized = false;
92 static Oid      object_classes[MAX_OCLASS];
93
94
95 static void findAutoDeletableObjects(const ObjectAddress *object,
96                                                  ObjectAddresses *oktodelete,
97                                                  Relation depRel);
98 static bool recursiveDeletion(const ObjectAddress *object,
99                                   DropBehavior behavior,
100                                   int msglevel,
101                                   const ObjectAddress *callingObject,
102                                   ObjectAddresses *oktodelete,
103                                   Relation depRel);
104 static bool deleteDependentObjects(const ObjectAddress *object,
105                                            const char *objDescription,
106                                            DropBehavior behavior,
107                                            int msglevel,
108                                            ObjectAddresses *oktodelete,
109                                            Relation depRel);
110 static void doDeletion(const ObjectAddress *object);
111 static bool find_expr_references_walker(Node *node,
112                                                         find_expr_references_context *context);
113 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
114 static int      object_address_comparator(const void *a, const void *b);
115 static void init_object_addresses(ObjectAddresses *addrs);
116 static void add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
117                                    ObjectAddresses *addrs);
118 static void add_exact_object_address(const ObjectAddress *object,
119                                                  ObjectAddresses *addrs);
120 static bool object_address_present(const ObjectAddress *object,
121                                            ObjectAddresses *addrs);
122 static void term_object_addresses(ObjectAddresses *addrs);
123 static void init_object_classes(void);
124 static ObjectClasses getObjectClass(const ObjectAddress *object);
125 static char *getObjectDescription(const ObjectAddress *object);
126 static void getRelationDescription(StringInfo buffer, Oid relid);
127
128
129 /*
130  * performDeletion: attempt to drop the specified object.  If CASCADE
131  * behavior is specified, also drop any dependent objects (recursively).
132  * If RESTRICT behavior is specified, error out if there are any dependent
133  * objects, except for those that should be implicitly dropped anyway
134  * according to the dependency type.
135  *
136  * This is the outer control routine for all forms of DROP that drop objects
137  * that can participate in dependencies.
138  */
139 void
140 performDeletion(const ObjectAddress *object,
141                                 DropBehavior behavior)
142 {
143         char       *objDescription;
144         Relation        depRel;
145         ObjectAddresses oktodelete;
146
147         /*
148          * Get object description for possible use in failure message. Must do
149          * this before deleting it ...
150          */
151         objDescription = getObjectDescription(object);
152
153         /*
154          * We save some cycles by opening pg_depend just once and passing the
155          * Relation pointer down to all the recursive deletion steps.
156          */
157         depRel = heap_openr(DependRelationName, RowExclusiveLock);
158
159         /*
160          * Construct a list of objects that are reachable by AUTO or INTERNAL
161          * dependencies from the target object.  These should be deleted
162          * silently, even if the actual deletion pass first reaches one of
163          * them via a non-auto dependency.
164          */
165         init_object_addresses(&oktodelete);
166
167         findAutoDeletableObjects(object, &oktodelete, depRel);
168
169         if (!recursiveDeletion(object, behavior, NOTICE,
170                                                    NULL, &oktodelete, depRel))
171                 ereport(ERROR,
172                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
173                           errmsg("cannot drop %s because other objects depend on it",
174                                          objDescription),
175                                  errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
176
177         term_object_addresses(&oktodelete);
178
179         heap_close(depRel, RowExclusiveLock);
180
181         pfree(objDescription);
182 }
183
184
185 /*
186  * deleteWhatDependsOn: attempt to drop everything that depends on the
187  * specified object, though not the object itself.      Behavior is always
188  * CASCADE.
189  *
190  * This is currently used only to clean out the contents of a schema
191  * (namespace): the passed object is a namespace.  We normally want this
192  * to be done silently, so there's an option to suppress NOTICE messages.
193  */
194 void
195 deleteWhatDependsOn(const ObjectAddress *object,
196                                         bool showNotices)
197 {
198         char       *objDescription;
199         Relation        depRel;
200         ObjectAddresses oktodelete;
201
202         /*
203          * Get object description for possible use in failure messages
204          */
205         objDescription = getObjectDescription(object);
206
207         /*
208          * We save some cycles by opening pg_depend just once and passing the
209          * Relation pointer down to all the recursive deletion steps.
210          */
211         depRel = heap_openr(DependRelationName, RowExclusiveLock);
212
213         /*
214          * Construct a list of objects that are reachable by AUTO or INTERNAL
215          * dependencies from the target object.  These should be deleted
216          * silently, even if the actual deletion pass first reaches one of
217          * them via a non-auto dependency.
218          */
219         init_object_addresses(&oktodelete);
220
221         findAutoDeletableObjects(object, &oktodelete, depRel);
222
223         /*
224          * Now invoke only step 2 of recursiveDeletion: just recurse to the
225          * stuff dependent on the given object.
226          */
227         if (!deleteDependentObjects(object, objDescription,
228                                                                 DROP_CASCADE,
229                                                                 showNotices ? NOTICE : DEBUG2,
230                                                                 &oktodelete, depRel))
231                 ereport(ERROR,
232                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
233                                  errmsg("failed to drop all objects depending on %s",
234                                                 objDescription)));
235
236         /*
237          * We do not need CommandCounterIncrement here, since if step 2 did
238          * anything then each recursive call will have ended with one.
239          */
240
241         term_object_addresses(&oktodelete);
242
243         heap_close(depRel, RowExclusiveLock);
244
245         pfree(objDescription);
246 }
247
248
249 /*
250  * findAutoDeletableObjects: find all objects that are reachable by AUTO or
251  * INTERNAL dependency paths from the given object.  Add them all to the
252  * oktodelete list.  Note that the originally given object will also be
253  * added to the list.
254  *
255  * depRel is the already-open pg_depend relation.
256  */
257 static void
258 findAutoDeletableObjects(const ObjectAddress *object,
259                                                  ObjectAddresses *oktodelete,
260                                                  Relation depRel)
261 {
262         ScanKeyData key[3];
263         int                     nkeys;
264         SysScanDesc scan;
265         HeapTuple       tup;
266         ObjectAddress otherObject;
267
268         /*
269          * If this object is already in oktodelete, then we already visited
270          * it; don't do so again (this prevents infinite recursion if there's
271          * a loop in pg_depend).  Otherwise, add it.
272          */
273         if (object_address_present(object, oktodelete))
274                 return;
275         add_exact_object_address(object, oktodelete);
276
277         /*
278          * Scan pg_depend records that link to this object, showing the things
279          * that depend on it.  For each one that is AUTO or INTERNAL, visit
280          * the referencing object.
281          *
282          * When dropping a whole object (subId = 0), find pg_depend records for
283          * its sub-objects too.
284          */
285         ScanKeyEntryInitialize(&key[0], 0x0,
286                                                    Anum_pg_depend_refclassid, F_OIDEQ,
287                                                    ObjectIdGetDatum(object->classId));
288         ScanKeyEntryInitialize(&key[1], 0x0,
289                                                    Anum_pg_depend_refobjid, F_OIDEQ,
290                                                    ObjectIdGetDatum(object->objectId));
291         if (object->objectSubId != 0)
292         {
293                 ScanKeyEntryInitialize(&key[2], 0x0,
294                                                            Anum_pg_depend_refobjsubid, F_INT4EQ,
295                                                            Int32GetDatum(object->objectSubId));
296                 nkeys = 3;
297         }
298         else
299                 nkeys = 2;
300
301         scan = systable_beginscan(depRel, DependReferenceIndex, true,
302                                                           SnapshotNow, nkeys, key);
303
304         while (HeapTupleIsValid(tup = systable_getnext(scan)))
305         {
306                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
307
308                 switch (foundDep->deptype)
309                 {
310                         case DEPENDENCY_NORMAL:
311                                 /* ignore */
312                                 break;
313                         case DEPENDENCY_AUTO:
314                         case DEPENDENCY_INTERNAL:
315                                 /* recurse */
316                                 otherObject.classId = foundDep->classid;
317                                 otherObject.objectId = foundDep->objid;
318                                 otherObject.objectSubId = foundDep->objsubid;
319                                 findAutoDeletableObjects(&otherObject, oktodelete, depRel);
320                                 break;
321                         case DEPENDENCY_PIN:
322
323                                 /*
324                                  * For a PIN dependency we just ereport immediately; there
325                                  * won't be any others to examine, and we aren't ever
326                                  * going to let the user delete it.
327                                  */
328                                 ereport(ERROR,
329                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
330                                                  errmsg("cannot drop %s because it is required by the database system",
331                                                                 getObjectDescription(object))));
332                                 break;
333                         default:
334                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
335                                          foundDep->deptype, getObjectDescription(object));
336                                 break;
337                 }
338         }
339
340         systable_endscan(scan);
341 }
342
343
344 /*
345  * recursiveDeletion: delete a single object for performDeletion, plus
346  * (recursively) anything that depends on it.
347  *
348  * Returns TRUE if successful, FALSE if not.
349  *
350  * callingObject is NULL at the outer level, else identifies the object that
351  * we recursed from (the reference object that someone else needs to delete).
352  *
353  * oktodelete is a list of objects verified deletable (ie, reachable by one
354  * or more AUTO or INTERNAL dependencies from the original target).
355  *
356  * depRel is the already-open pg_depend relation.
357  *
358  *
359  * In RESTRICT mode, we perform all the deletions anyway, but ereport a message
360  * and return FALSE if we find a restriction violation.  performDeletion
361  * will then abort the transaction to nullify the deletions.  We have to
362  * do it this way to (a) report all the direct and indirect dependencies
363  * while (b) not going into infinite recursion if there's a cycle.
364  *
365  * This is even more complex than one could wish, because it is possible for
366  * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
367  * dependencies.  Also, we might have a situation where we've been asked to
368  * delete object A, and objects B and C both have AUTO dependencies on A,
369  * but B also has a NORMAL dependency on C.  (Since any of these paths might
370  * be indirect, we can't prevent these scenarios, but must cope instead.)
371  * If we visit C before B then we would mistakenly decide that the B->C link
372  * should prevent the restricted drop from occurring.  To handle this, we make
373  * a pre-scan to find all the objects that are auto-deletable from A.  If we
374  * visit C first, but B is present in the oktodelete list, then we make no
375  * complaint but recurse to delete B anyway.  (Note that in general we must
376  * delete B before deleting C; the drop routine for B may try to access C.)
377  *
378  * Note: in the case where the path to B is traversed first, we will not
379  * see the NORMAL dependency when we reach C, because of the pg_depend
380  * removals done in step 1.  The oktodelete list is necessary just
381  * to make the behavior independent of the order in which pg_depend
382  * entries are visited.
383  */
384 static bool
385 recursiveDeletion(const ObjectAddress *object,
386                                   DropBehavior behavior,
387                                   int msglevel,
388                                   const ObjectAddress *callingObject,
389                                   ObjectAddresses *oktodelete,
390                                   Relation depRel)
391 {
392         bool            ok = true;
393         char       *objDescription;
394         ScanKeyData key[3];
395         int                     nkeys;
396         SysScanDesc scan;
397         HeapTuple       tup;
398         ObjectAddress otherObject;
399         ObjectAddress owningObject;
400         bool            amOwned = false;
401
402         /*
403          * Get object description for possible use in messages.  Must do this
404          * before deleting it ...
405          */
406         objDescription = getObjectDescription(object);
407
408         /*
409          * Step 1: find and remove pg_depend records that link from this
410          * object to others.  We have to do this anyway, and doing it first
411          * ensures that we avoid infinite recursion in the case of cycles.
412          * Also, some dependency types require extra processing here.
413          *
414          * When dropping a whole object (subId = 0), remove all pg_depend records
415          * for its sub-objects too.
416          */
417         ScanKeyEntryInitialize(&key[0], 0x0,
418                                                    Anum_pg_depend_classid, F_OIDEQ,
419                                                    ObjectIdGetDatum(object->classId));
420         ScanKeyEntryInitialize(&key[1], 0x0,
421                                                    Anum_pg_depend_objid, F_OIDEQ,
422                                                    ObjectIdGetDatum(object->objectId));
423         if (object->objectSubId != 0)
424         {
425                 ScanKeyEntryInitialize(&key[2], 0x0,
426                                                            Anum_pg_depend_objsubid, F_INT4EQ,
427                                                            Int32GetDatum(object->objectSubId));
428                 nkeys = 3;
429         }
430         else
431                 nkeys = 2;
432
433         scan = systable_beginscan(depRel, DependDependerIndex, true,
434                                                           SnapshotNow, nkeys, key);
435
436         while (HeapTupleIsValid(tup = systable_getnext(scan)))
437         {
438                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
439
440                 otherObject.classId = foundDep->refclassid;
441                 otherObject.objectId = foundDep->refobjid;
442                 otherObject.objectSubId = foundDep->refobjsubid;
443
444                 switch (foundDep->deptype)
445                 {
446                         case DEPENDENCY_NORMAL:
447                         case DEPENDENCY_AUTO:
448                                 /* no problem */
449                                 break;
450                         case DEPENDENCY_INTERNAL:
451
452                                 /*
453                                  * This object is part of the internal implementation of
454                                  * another object.      We have three cases:
455                                  *
456                                  * 1. At the outermost recursion level, disallow the DROP.
457                                  * (We just ereport here, rather than proceeding, since no
458                                  * other dependencies are likely to be interesting.)
459                                  */
460                                 if (callingObject == NULL)
461                                 {
462                                         char       *otherObjDesc = getObjectDescription(&otherObject);
463
464                                         ereport(ERROR,
465                                                  (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
466                                                   errmsg("cannot drop %s because %s requires it",
467                                                                  objDescription, otherObjDesc),
468                                                   errhint("You may drop %s instead.",
469                                                                   otherObjDesc)));
470                                 }
471
472                                 /*
473                                  * 2. When recursing from the other end of this
474                                  * dependency, it's okay to continue with the deletion.
475                                  * This holds when recursing from a whole object that
476                                  * includes the nominal other end as a component, too.
477                                  */
478                                 if (callingObject->classId == otherObject.classId &&
479                                         callingObject->objectId == otherObject.objectId &&
480                                 (callingObject->objectSubId == otherObject.objectSubId ||
481                                  callingObject->objectSubId == 0))
482                                         break;
483
484                                 /*
485                                  * 3. When recursing from anyplace else, transform this
486                                  * deletion request into a delete of the other object.
487                                  * (This will be an error condition iff RESTRICT mode.) In
488                                  * this case we finish deleting my dependencies except for
489                                  * the INTERNAL link, which will be needed to cause the
490                                  * owning object to recurse back to me.
491                                  */
492                                 if (amOwned)    /* shouldn't happen */
493                                         elog(ERROR, "multiple INTERNAL dependencies for %s",
494                                                  objDescription);
495                                 owningObject = otherObject;
496                                 amOwned = true;
497                                 /* "continue" bypasses the simple_heap_delete call below */
498                                 continue;
499                         case DEPENDENCY_PIN:
500
501                                 /*
502                                  * Should not happen; PIN dependencies should have zeroes
503                                  * in the depender fields...
504                                  */
505                                 elog(ERROR, "incorrect use of PIN dependency with %s",
506                                          objDescription);
507                                 break;
508                         default:
509                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
510                                          foundDep->deptype, objDescription);
511                                 break;
512                 }
513
514                 simple_heap_delete(depRel, &tup->t_self);
515         }
516
517         systable_endscan(scan);
518
519         /*
520          * CommandCounterIncrement here to ensure that preceding changes are
521          * all visible; in particular, that the above deletions of pg_depend
522          * entries are visible.  That prevents infinite recursion in case of a
523          * dependency loop (which is perfectly legal).
524          */
525         CommandCounterIncrement();
526
527         /*
528          * If we found we are owned by another object, ask it to delete itself
529          * instead of proceeding.  Complain if RESTRICT mode, unless the other
530          * object is in oktodelete.
531          */
532         if (amOwned)
533         {
534                 if (object_address_present(&owningObject, oktodelete))
535                         ereport(DEBUG2,
536                                         (errmsg("drop auto-cascades to %s",
537                                                         getObjectDescription(&owningObject))));
538                 else if (behavior == DROP_RESTRICT)
539                 {
540                         ereport(msglevel,
541                                         (errmsg("%s depends on %s",
542                                                         getObjectDescription(&owningObject),
543                                                         objDescription)));
544                         ok = false;
545                 }
546                 else
547                         ereport(msglevel,
548                                         (errmsg("drop cascades to %s",
549                                                         getObjectDescription(&owningObject))));
550
551                 if (!recursiveDeletion(&owningObject, behavior, msglevel,
552                                                            object, oktodelete, depRel))
553                         ok = false;
554
555                 pfree(objDescription);
556
557                 return ok;
558         }
559
560         /*
561          * Step 2: scan pg_depend records that link to this object, showing
562          * the things that depend on it.  Recursively delete those things.
563          * Note it's important to delete the dependent objects before the
564          * referenced one, since the deletion routines might do things like
565          * try to update the pg_class record when deleting a check constraint.
566          */
567         if (!deleteDependentObjects(object, objDescription,
568                                                                 behavior, msglevel,
569                                                                 oktodelete, depRel))
570                 ok = false;
571
572         /*
573          * We do not need CommandCounterIncrement here, since if step 2 did
574          * anything then each recursive call will have ended with one.
575          */
576
577         /*
578          * Step 3: delete the object itself.
579          */
580         doDeletion(object);
581
582         /*
583          * Delete any comments associated with this object.  (This is a
584          * convenient place to do it instead of having every object type know
585          * to do it.)
586          */
587         DeleteComments(object->objectId, object->classId, object->objectSubId);
588
589         /*
590          * CommandCounterIncrement here to ensure that preceding changes are
591          * all visible.
592          */
593         CommandCounterIncrement();
594
595         /*
596          * And we're done!
597          */
598         pfree(objDescription);
599
600         return ok;
601 }
602
603
604 /*
605  * deleteDependentObjects - find and delete objects that depend on 'object'
606  *
607  * Scan pg_depend records that link to the given object, showing
608  * the things that depend on it.  Recursively delete those things. (We
609  * don't delete the pg_depend records here, as the recursive call will
610  * do that.)  Note it's important to delete the dependent objects
611  * before the referenced one, since the deletion routines might do
612  * things like try to update the pg_class record when deleting a check
613  * constraint.
614  *
615  * When dropping a whole object (subId = 0), find pg_depend records for
616  * its sub-objects too.
617  *
618  *      object: the object to find dependencies on
619  *      objDescription: description of object (only used for error messages)
620  *      behavior: desired drop behavior
621  *      oktodelete: stuff that's AUTO-deletable
622  *      depRel: already opened pg_depend relation
623  *
624  * Returns TRUE if all is well, false if any problem found.
625  *
626  * NOTE: because we are using SnapshotNow, if a recursive call deletes
627  * any pg_depend tuples that our scan hasn't yet visited, we will not
628  * see them as good when we do visit them.      This is essential for
629  * correct behavior if there are multiple dependency paths between two
630  * objects --- else we might try to delete an already-deleted object.
631  */
632 static bool
633 deleteDependentObjects(const ObjectAddress *object,
634                                            const char *objDescription,
635                                            DropBehavior behavior,
636                                            int msglevel,
637                                            ObjectAddresses *oktodelete,
638                                            Relation depRel)
639 {
640         bool            ok = true;
641         ScanKeyData key[3];
642         int                     nkeys;
643         SysScanDesc scan;
644         HeapTuple       tup;
645         ObjectAddress otherObject;
646
647         ScanKeyEntryInitialize(&key[0], 0x0,
648                                                    Anum_pg_depend_refclassid, F_OIDEQ,
649                                                    ObjectIdGetDatum(object->classId));
650         ScanKeyEntryInitialize(&key[1], 0x0,
651                                                    Anum_pg_depend_refobjid, F_OIDEQ,
652                                                    ObjectIdGetDatum(object->objectId));
653         if (object->objectSubId != 0)
654         {
655                 ScanKeyEntryInitialize(&key[2], 0x0,
656                                                            Anum_pg_depend_refobjsubid, F_INT4EQ,
657                                                            Int32GetDatum(object->objectSubId));
658                 nkeys = 3;
659         }
660         else
661                 nkeys = 2;
662
663         scan = systable_beginscan(depRel, DependReferenceIndex, true,
664                                                           SnapshotNow, nkeys, key);
665
666         while (HeapTupleIsValid(tup = systable_getnext(scan)))
667         {
668                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
669
670                 otherObject.classId = foundDep->classid;
671                 otherObject.objectId = foundDep->objid;
672                 otherObject.objectSubId = foundDep->objsubid;
673
674                 switch (foundDep->deptype)
675                 {
676                         case DEPENDENCY_NORMAL:
677
678                                 /*
679                                  * Perhaps there was another dependency path that would
680                                  * have allowed silent deletion of the otherObject, had we
681                                  * only taken that path first. In that case, act like this
682                                  * link is AUTO, too.
683                                  */
684                                 if (object_address_present(&otherObject, oktodelete))
685                                         ereport(DEBUG2,
686                                                         (errmsg("drop auto-cascades to %s",
687                                                                         getObjectDescription(&otherObject))));
688                                 else if (behavior == DROP_RESTRICT)
689                                 {
690                                         ereport(msglevel,
691                                                         (errmsg("%s depends on %s",
692                                                                         getObjectDescription(&otherObject),
693                                                                         objDescription)));
694                                         ok = false;
695                                 }
696                                 else
697                                         ereport(msglevel,
698                                                         (errmsg("drop cascades to %s",
699                                                                         getObjectDescription(&otherObject))));
700
701                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
702                                                                            object, oktodelete, depRel))
703                                         ok = false;
704                                 break;
705                         case DEPENDENCY_AUTO:
706                         case DEPENDENCY_INTERNAL:
707
708                                 /*
709                                  * We propagate the DROP without complaint even in the
710                                  * RESTRICT case.  (However, normal dependencies on the
711                                  * component object could still cause failure.)
712                                  */
713                                 ereport(DEBUG2,
714                                                 (errmsg("drop auto-cascades to %s",
715                                                                 getObjectDescription(&otherObject))));
716
717                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
718                                                                            object, oktodelete, depRel))
719                                         ok = false;
720                                 break;
721                         case DEPENDENCY_PIN:
722
723                                 /*
724                                  * For a PIN dependency we just ereport immediately; there
725                                  * won't be any others to report.
726                                  */
727                                 ereport(ERROR,
728                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
729                                                  errmsg("cannot drop %s because it is required by the database system",
730                                                                 objDescription)));
731                                 break;
732                         default:
733                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
734                                          foundDep->deptype, objDescription);
735                                 break;
736                 }
737         }
738
739         systable_endscan(scan);
740
741         return ok;
742 }
743
744
745 /*
746  * doDeletion: actually delete a single object
747  */
748 static void
749 doDeletion(const ObjectAddress *object)
750 {
751         switch (getObjectClass(object))
752         {
753                 case OCLASS_CLASS:
754                         {
755                                 char            relKind = get_rel_relkind(object->objectId);
756
757                                 if (relKind == RELKIND_INDEX)
758                                 {
759                                         Assert(object->objectSubId == 0);
760                                         index_drop(object->objectId);
761                                 }
762                                 else
763                                 {
764                                         if (object->objectSubId != 0)
765                                                 RemoveAttributeById(object->objectId,
766                                                                                         object->objectSubId);
767                                         else
768                                                 heap_drop_with_catalog(object->objectId);
769                                 }
770                                 break;
771                         }
772
773                 case OCLASS_PROC:
774                         RemoveFunctionById(object->objectId);
775                         break;
776
777                 case OCLASS_TYPE:
778                         RemoveTypeById(object->objectId);
779                         break;
780
781                 case OCLASS_CAST:
782                         DropCastById(object->objectId);
783                         break;
784
785                 case OCLASS_CONSTRAINT:
786                         RemoveConstraintById(object->objectId);
787                         break;
788
789                 case OCLASS_CONVERSION:
790                         RemoveConversionById(object->objectId);
791                         break;
792
793                 case OCLASS_DEFAULT:
794                         RemoveAttrDefaultById(object->objectId);
795                         break;
796
797                 case OCLASS_LANGUAGE:
798                         DropProceduralLanguageById(object->objectId);
799                         break;
800
801                 case OCLASS_OPERATOR:
802                         RemoveOperatorById(object->objectId);
803                         break;
804
805                 case OCLASS_OPCLASS:
806                         RemoveOpClassById(object->objectId);
807                         break;
808
809                 case OCLASS_REWRITE:
810                         RemoveRewriteRuleById(object->objectId);
811                         break;
812
813                 case OCLASS_TRIGGER:
814                         RemoveTriggerById(object->objectId);
815                         break;
816
817                 case OCLASS_SCHEMA:
818                         RemoveSchemaById(object->objectId);
819                         break;
820
821                 default:
822                         elog(ERROR, "unrecognized object class: %u",
823                                  object->classId);
824         }
825 }
826
827 /*
828  * recordDependencyOnExpr - find expression dependencies
829  *
830  * This is used to find the dependencies of rules, constraint expressions,
831  * etc.
832  *
833  * Given an expression or query in node-tree form, find all the objects
834  * it refers to (tables, columns, operators, functions, etc).  Record
835  * a dependency of the specified type from the given depender object
836  * to each object mentioned in the expression.
837  *
838  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
839  * It can be NIL if no such variables are expected.
840  *
841  * XXX is it important to create dependencies on the datatypes mentioned in
842  * the expression?      In most cases this would be redundant (eg, a ref to an
843  * operator indirectly references its input and output datatypes), but I'm
844  * not quite convinced there are no cases where we need it.
845  */
846 void
847 recordDependencyOnExpr(const ObjectAddress *depender,
848                                            Node *expr, List *rtable,
849                                            DependencyType behavior)
850 {
851         find_expr_references_context context;
852
853         init_object_addresses(&context.addrs);
854
855         /* Set up interpretation for Vars at varlevelsup = 0 */
856         context.rtables = makeList1(rtable);
857
858         /* Scan the expression tree for referenceable objects */
859         find_expr_references_walker(expr, &context);
860
861         /* Remove any duplicates */
862         eliminate_duplicate_dependencies(&context.addrs);
863
864         /* And record 'em */
865         recordMultipleDependencies(depender,
866                                                            context.addrs.refs, context.addrs.numrefs,
867                                                            behavior);
868
869         term_object_addresses(&context.addrs);
870 }
871
872 /*
873  * recordDependencyOnSingleRelExpr - find expression dependencies
874  *
875  * As above, but only one relation is expected to be referenced (with
876  * varno = 1 and varlevelsup = 0).      Pass the relation OID instead of a
877  * range table.  An additional frammish is that dependencies on that
878  * relation (or its component columns) will be marked with 'self_behavior',
879  * whereas 'behavior' is used for everything else.
880  */
881 void
882 recordDependencyOnSingleRelExpr(const ObjectAddress *depender,
883                                                                 Node *expr, Oid relId,
884                                                                 DependencyType behavior,
885                                                                 DependencyType self_behavior)
886 {
887         find_expr_references_context context;
888         RangeTblEntry rte;
889
890         init_object_addresses(&context.addrs);
891
892         /* We gin up a rather bogus rangetable list to handle Vars */
893         MemSet(&rte, 0, sizeof(rte));
894         rte.type = T_RangeTblEntry;
895         rte.rtekind = RTE_RELATION;
896         rte.relid = relId;
897
898         context.rtables = makeList1(makeList1(&rte));
899
900         /* Scan the expression tree for referenceable objects */
901         find_expr_references_walker(expr, &context);
902
903         /* Remove any duplicates */
904         eliminate_duplicate_dependencies(&context.addrs);
905
906         /* Separate self-dependencies if necessary */
907         if (behavior != self_behavior && context.addrs.numrefs > 0)
908         {
909                 ObjectAddresses self_addrs;
910                 ObjectAddress *outobj;
911                 int                     oldref,
912                                         outrefs;
913
914                 init_object_addresses(&self_addrs);
915
916                 outobj = context.addrs.refs;
917                 outrefs = 0;
918                 for (oldref = 0; oldref < context.addrs.numrefs; oldref++)
919                 {
920                         ObjectAddress *thisobj = context.addrs.refs + oldref;
921
922                         if (thisobj->classId == RelOid_pg_class &&
923                                 thisobj->objectId == relId)
924                         {
925                                 /* Move this ref into self_addrs */
926                                 add_object_address(OCLASS_CLASS, relId, thisobj->objectSubId,
927                                                                    &self_addrs);
928                         }
929                         else
930                         {
931                                 /* Keep it in context.addrs */
932                                 outobj->classId = thisobj->classId;
933                                 outobj->objectId = thisobj->objectId;
934                                 outobj->objectSubId = thisobj->objectSubId;
935                                 outobj++;
936                                 outrefs++;
937                         }
938                 }
939                 context.addrs.numrefs = outrefs;
940
941                 /* Record the self-dependencies */
942                 recordMultipleDependencies(depender,
943                                                                    self_addrs.refs, self_addrs.numrefs,
944                                                                    self_behavior);
945
946                 term_object_addresses(&self_addrs);
947         }
948
949         /* Record the external dependencies */
950         recordMultipleDependencies(depender,
951                                                            context.addrs.refs, context.addrs.numrefs,
952                                                            behavior);
953
954         term_object_addresses(&context.addrs);
955 }
956
957 /*
958  * Recursively search an expression tree for object references.
959  *
960  * Note: we avoid creating references to columns of tables that participate
961  * in an SQL JOIN construct, but are not actually used anywhere in the query.
962  * To do so, we do not scan the joinaliasvars list of a join RTE while
963  * scanning the query rangetable, but instead scan each individual entry
964  * of the alias list when we find a reference to it.
965  */
966 static bool
967 find_expr_references_walker(Node *node,
968                                                         find_expr_references_context *context)
969 {
970         if (node == NULL)
971                 return false;
972         if (IsA(node, Var))
973         {
974                 Var                *var = (Var *) node;
975                 int                     levelsup;
976                 List       *rtable,
977                                    *rtables;
978                 RangeTblEntry *rte;
979
980                 /* Find matching rtable entry, or complain if not found */
981                 levelsup = var->varlevelsup;
982                 rtables = context->rtables;
983                 while (levelsup--)
984                 {
985                         if (rtables == NIL)
986                                 break;
987                         rtables = lnext(rtables);
988                 }
989                 if (rtables == NIL)
990                         elog(ERROR, "invalid varlevelsup %d", var->varlevelsup);
991                 rtable = lfirst(rtables);
992                 if (var->varno <= 0 || var->varno > length(rtable))
993                         elog(ERROR, "invalid varno %d", var->varno);
994                 rte = rt_fetch(var->varno, rtable);
995                 if (rte->rtekind == RTE_RELATION)
996                 {
997                         /* If it's a plain relation, reference this column */
998                         /* NB: this code works for whole-row Var with attno 0, too */
999                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
1000                                                            &context->addrs);
1001                 }
1002                 else if (rte->rtekind == RTE_JOIN)
1003                 {
1004                         /* Scan join output column to add references to join inputs */
1005                         List       *save_rtables;
1006
1007                         /* We must make the context appropriate for join's level */
1008                         save_rtables = context->rtables;
1009                         context->rtables = rtables;
1010                         if (var->varattno <= 0 ||
1011                                 var->varattno > length(rte->joinaliasvars))
1012                                 elog(ERROR, "invalid varattno %d", var->varattno);
1013                         find_expr_references_walker((Node *) nth(var->varattno - 1,
1014                                                                                                          rte->joinaliasvars),
1015                                                                                 context);
1016                         context->rtables = save_rtables;
1017                 }
1018                 return false;
1019         }
1020         if (IsA(node, FuncExpr))
1021         {
1022                 FuncExpr   *funcexpr = (FuncExpr *) node;
1023
1024                 add_object_address(OCLASS_PROC, funcexpr->funcid, 0,
1025                                                    &context->addrs);
1026                 /* fall through to examine arguments */
1027         }
1028         if (IsA(node, OpExpr))
1029         {
1030                 OpExpr     *opexpr = (OpExpr *) node;
1031
1032                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1033                                                    &context->addrs);
1034                 /* fall through to examine arguments */
1035         }
1036         if (IsA(node, DistinctExpr))
1037         {
1038                 DistinctExpr *distinctexpr = (DistinctExpr *) node;
1039
1040                 add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0,
1041                                                    &context->addrs);
1042                 /* fall through to examine arguments */
1043         }
1044         if (IsA(node, ScalarArrayOpExpr))
1045         {
1046                 ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;
1047
1048                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1049                                                    &context->addrs);
1050                 /* fall through to examine arguments */
1051         }
1052         if (IsA(node, NullIfExpr))
1053         {
1054                 NullIfExpr *nullifexpr = (NullIfExpr *) node;
1055
1056                 add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0,
1057                                                    &context->addrs);
1058                 /* fall through to examine arguments */
1059         }
1060         if (IsA(node, Aggref))
1061         {
1062                 Aggref     *aggref = (Aggref *) node;
1063
1064                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
1065                                                    &context->addrs);
1066                 /* fall through to examine arguments */
1067         }
1068         if (IsA(node, SubLink))
1069         {
1070                 SubLink    *sublink = (SubLink *) node;
1071                 List       *opid;
1072
1073                 foreach(opid, sublink->operOids)
1074                 {
1075                         add_object_address(OCLASS_OPERATOR, lfirsto(opid), 0,
1076                                                            &context->addrs);
1077                 }
1078                 /* fall through to examine arguments */
1079         }
1080         if (is_subplan(node))
1081         {
1082                 /* Extra work needed here if we ever need this case */
1083                 elog(ERROR, "already-planned subqueries not supported");
1084         }
1085         if (IsA(node, Query))
1086         {
1087                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
1088                 Query      *query = (Query *) node;
1089                 List       *rtable;
1090                 bool            result;
1091
1092                 /*
1093                  * Add whole-relation refs for each plain relation mentioned in
1094                  * the subquery's rtable.  (Note: query_tree_walker takes care of
1095                  * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need
1096                  * to do that here.  But keep it from looking at join alias
1097                  * lists.)
1098                  */
1099                 foreach(rtable, query->rtable)
1100                 {
1101                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
1102
1103                         if (rte->rtekind == RTE_RELATION)
1104                                 add_object_address(OCLASS_CLASS, rte->relid, 0,
1105                                                                    &context->addrs);
1106                 }
1107
1108                 /* Examine substructure of query */
1109                 context->rtables = lcons(query->rtable, context->rtables);
1110                 result = query_tree_walker(query,
1111                                                                    find_expr_references_walker,
1112                                                                    (void *) context,
1113                                                                    QTW_IGNORE_JOINALIASES);
1114                 context->rtables = lnext(context->rtables);
1115                 return result;
1116         }
1117         return expression_tree_walker(node, find_expr_references_walker,
1118                                                                   (void *) context);
1119 }
1120
1121 /*
1122  * Given an array of dependency references, eliminate any duplicates.
1123  */
1124 static void
1125 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
1126 {
1127         ObjectAddress *priorobj;
1128         int                     oldref,
1129                                 newrefs;
1130
1131         if (addrs->numrefs <= 1)
1132                 return;                                 /* nothing to do */
1133
1134         /* Sort the refs so that duplicates are adjacent */
1135         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
1136                   object_address_comparator);
1137
1138         /* Remove dups */
1139         priorobj = addrs->refs;
1140         newrefs = 1;
1141         for (oldref = 1; oldref < addrs->numrefs; oldref++)
1142         {
1143                 ObjectAddress *thisobj = addrs->refs + oldref;
1144
1145                 if (priorobj->classId == thisobj->classId &&
1146                         priorobj->objectId == thisobj->objectId)
1147                 {
1148                         if (priorobj->objectSubId == thisobj->objectSubId)
1149                                 continue;               /* identical, so drop thisobj */
1150
1151                         /*
1152                          * If we have a whole-object reference and a reference to a
1153                          * part of the same object, we don't need the whole-object
1154                          * reference (for example, we don't need to reference both
1155                          * table foo and column foo.bar).  The whole-object reference
1156                          * will always appear first in the sorted list.
1157                          */
1158                         if (priorobj->objectSubId == 0)
1159                         {
1160                                 /* replace whole ref with partial */
1161                                 priorobj->objectSubId = thisobj->objectSubId;
1162                                 continue;
1163                         }
1164                 }
1165                 /* Not identical, so add thisobj to output set */
1166                 priorobj++;
1167                 priorobj->classId = thisobj->classId;
1168                 priorobj->objectId = thisobj->objectId;
1169                 priorobj->objectSubId = thisobj->objectSubId;
1170                 newrefs++;
1171         }
1172
1173         addrs->numrefs = newrefs;
1174 }
1175
1176 /*
1177  * qsort comparator for ObjectAddress items
1178  */
1179 static int
1180 object_address_comparator(const void *a, const void *b)
1181 {
1182         const ObjectAddress *obja = (const ObjectAddress *) a;
1183         const ObjectAddress *objb = (const ObjectAddress *) b;
1184
1185         if (obja->classId < objb->classId)
1186                 return -1;
1187         if (obja->classId > objb->classId)
1188                 return 1;
1189         if (obja->objectId < objb->objectId)
1190                 return -1;
1191         if (obja->objectId > objb->objectId)
1192                 return 1;
1193
1194         /*
1195          * We sort the subId as an unsigned int so that 0 will come first. See
1196          * logic in eliminate_duplicate_dependencies.
1197          */
1198         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
1199                 return -1;
1200         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
1201                 return 1;
1202         return 0;
1203 }
1204
1205 /*
1206  * Routines for handling an expansible array of ObjectAddress items.
1207  *
1208  * init_object_addresses: initialize an ObjectAddresses array.
1209  */
1210 static void
1211 init_object_addresses(ObjectAddresses *addrs)
1212 {
1213         /* Initialize array to empty */
1214         addrs->numrefs = 0;
1215         addrs->maxrefs = 32;            /* arbitrary initial array size */
1216         addrs->refs = (ObjectAddress *)
1217                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
1218
1219         /* Initialize object_classes[] if not done yet */
1220         /* This will be needed by add_object_address() */
1221         if (!object_classes_initialized)
1222                 init_object_classes();
1223 }
1224
1225 /*
1226  * Add an entry to an ObjectAddresses array.
1227  *
1228  * It is convenient to specify the class by ObjectClass rather than directly
1229  * by catalog OID.
1230  */
1231 static void
1232 add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
1233                                    ObjectAddresses *addrs)
1234 {
1235         ObjectAddress *item;
1236
1237         /* enlarge array if needed */
1238         if (addrs->numrefs >= addrs->maxrefs)
1239         {
1240                 addrs->maxrefs *= 2;
1241                 addrs->refs = (ObjectAddress *)
1242                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1243         }
1244         /* record this item */
1245         item = addrs->refs + addrs->numrefs;
1246         item->classId = object_classes[oclass];
1247         item->objectId = objectId;
1248         item->objectSubId = subId;
1249         addrs->numrefs++;
1250 }
1251
1252 /*
1253  * Add an entry to an ObjectAddresses array.
1254  *
1255  * As above, but specify entry exactly.
1256  */
1257 static void
1258 add_exact_object_address(const ObjectAddress *object,
1259                                                  ObjectAddresses *addrs)
1260 {
1261         ObjectAddress *item;
1262
1263         /* enlarge array if needed */
1264         if (addrs->numrefs >= addrs->maxrefs)
1265         {
1266                 addrs->maxrefs *= 2;
1267                 addrs->refs = (ObjectAddress *)
1268                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1269         }
1270         /* record this item */
1271         item = addrs->refs + addrs->numrefs;
1272         *item = *object;
1273         addrs->numrefs++;
1274 }
1275
1276 /*
1277  * Test whether an object is present in an ObjectAddresses array.
1278  *
1279  * We return "true" if object is a subobject of something in the array, too.
1280  */
1281 static bool
1282 object_address_present(const ObjectAddress *object,
1283                                            ObjectAddresses *addrs)
1284 {
1285         int                     i;
1286
1287         for (i = addrs->numrefs - 1; i >= 0; i--)
1288         {
1289                 ObjectAddress *thisobj = addrs->refs + i;
1290
1291                 if (object->classId == thisobj->classId &&
1292                         object->objectId == thisobj->objectId)
1293                 {
1294                         if (object->objectSubId == thisobj->objectSubId ||
1295                                 thisobj->objectSubId == 0)
1296                                 return true;
1297                 }
1298         }
1299
1300         return false;
1301 }
1302
1303 /*
1304  * Clean up when done with an ObjectAddresses array.
1305  */
1306 static void
1307 term_object_addresses(ObjectAddresses *addrs)
1308 {
1309         pfree(addrs->refs);
1310 }
1311
1312 /*
1313  * Initialize the object_classes[] table.
1314  *
1315  * Although some of these OIDs aren't compile-time constants, they surely
1316  * shouldn't change during a backend's run.  So, we look them up the
1317  * first time through and then cache them.
1318  */
1319 static void
1320 init_object_classes(void)
1321 {
1322         object_classes[OCLASS_CLASS] = RelOid_pg_class;
1323         object_classes[OCLASS_PROC] = RelOid_pg_proc;
1324         object_classes[OCLASS_TYPE] = RelOid_pg_type;
1325         object_classes[OCLASS_CAST] = get_system_catalog_relid(CastRelationName);
1326         object_classes[OCLASS_CONSTRAINT] = get_system_catalog_relid(ConstraintRelationName);
1327         object_classes[OCLASS_CONVERSION] = get_system_catalog_relid(ConversionRelationName);
1328         object_classes[OCLASS_DEFAULT] = get_system_catalog_relid(AttrDefaultRelationName);
1329         object_classes[OCLASS_LANGUAGE] = get_system_catalog_relid(LanguageRelationName);
1330         object_classes[OCLASS_OPERATOR] = get_system_catalog_relid(OperatorRelationName);
1331         object_classes[OCLASS_OPCLASS] = get_system_catalog_relid(OperatorClassRelationName);
1332         object_classes[OCLASS_REWRITE] = get_system_catalog_relid(RewriteRelationName);
1333         object_classes[OCLASS_TRIGGER] = get_system_catalog_relid(TriggerRelationName);
1334         object_classes[OCLASS_SCHEMA] = get_system_catalog_relid(NamespaceRelationName);
1335         object_classes_initialized = true;
1336 }
1337
1338 /*
1339  * Determine the class of a given object identified by objectAddress.
1340  *
1341  * This function is needed just because some of the system catalogs do
1342  * not have hardwired-at-compile-time OIDs.
1343  */
1344 static ObjectClasses
1345 getObjectClass(const ObjectAddress *object)
1346 {
1347         /* Easy for the bootstrapped catalogs... */
1348         switch (object->classId)
1349         {
1350                 case RelOid_pg_class:
1351                         /* caller must check objectSubId */
1352                         return OCLASS_CLASS;
1353
1354                 case RelOid_pg_proc:
1355                         Assert(object->objectSubId == 0);
1356                         return OCLASS_PROC;
1357
1358                 case RelOid_pg_type:
1359                         Assert(object->objectSubId == 0);
1360                         return OCLASS_TYPE;
1361         }
1362
1363         /*
1364          * Handle cases where catalog's OID is not hardwired.
1365          */
1366         if (!object_classes_initialized)
1367                 init_object_classes();
1368
1369         if (object->classId == object_classes[OCLASS_CAST])
1370         {
1371                 Assert(object->objectSubId == 0);
1372                 return OCLASS_CAST;
1373         }
1374         if (object->classId == object_classes[OCLASS_CONSTRAINT])
1375         {
1376                 Assert(object->objectSubId == 0);
1377                 return OCLASS_CONSTRAINT;
1378         }
1379         if (object->classId == object_classes[OCLASS_CONVERSION])
1380         {
1381                 Assert(object->objectSubId == 0);
1382                 return OCLASS_CONVERSION;
1383         }
1384         if (object->classId == object_classes[OCLASS_DEFAULT])
1385         {
1386                 Assert(object->objectSubId == 0);
1387                 return OCLASS_DEFAULT;
1388         }
1389         if (object->classId == object_classes[OCLASS_LANGUAGE])
1390         {
1391                 Assert(object->objectSubId == 0);
1392                 return OCLASS_LANGUAGE;
1393         }
1394         if (object->classId == object_classes[OCLASS_OPERATOR])
1395         {
1396                 Assert(object->objectSubId == 0);
1397                 return OCLASS_OPERATOR;
1398         }
1399         if (object->classId == object_classes[OCLASS_OPCLASS])
1400         {
1401                 Assert(object->objectSubId == 0);
1402                 return OCLASS_OPCLASS;
1403         }
1404         if (object->classId == object_classes[OCLASS_REWRITE])
1405         {
1406                 Assert(object->objectSubId == 0);
1407                 return OCLASS_REWRITE;
1408         }
1409         if (object->classId == object_classes[OCLASS_TRIGGER])
1410         {
1411                 Assert(object->objectSubId == 0);
1412                 return OCLASS_TRIGGER;
1413         }
1414         if (object->classId == object_classes[OCLASS_SCHEMA])
1415         {
1416                 Assert(object->objectSubId == 0);
1417                 return OCLASS_SCHEMA;
1418         }
1419
1420         elog(ERROR, "unrecognized object class: %u", object->classId);
1421         return OCLASS_CLASS;            /* keep compiler quiet */
1422 }
1423
1424 /*
1425  * getObjectDescription: build an object description for messages
1426  *
1427  * The result is a palloc'd string.
1428  */
1429 static char *
1430 getObjectDescription(const ObjectAddress *object)
1431 {
1432         StringInfoData buffer;
1433
1434         initStringInfo(&buffer);
1435
1436         switch (getObjectClass(object))
1437         {
1438                 case OCLASS_CLASS:
1439                         getRelationDescription(&buffer, object->objectId);
1440                         if (object->objectSubId != 0)
1441                                 appendStringInfo(&buffer, " column %s",
1442                                                                  get_relid_attribute_name(object->objectId,
1443                                                                                                                   object->objectSubId));
1444                         break;
1445
1446                 case OCLASS_PROC:
1447                         appendStringInfo(&buffer, "function %s",
1448                                                          format_procedure(object->objectId));
1449                         break;
1450
1451                 case OCLASS_TYPE:
1452                         appendStringInfo(&buffer, "type %s",
1453                                                          format_type_be(object->objectId));
1454                         break;
1455
1456                 case OCLASS_CAST:
1457                         {
1458                                 Relation        castDesc;
1459                                 ScanKeyData skey[1];
1460                                 SysScanDesc rcscan;
1461                                 HeapTuple       tup;
1462                                 Form_pg_cast castForm;
1463
1464                                 castDesc = heap_openr(CastRelationName, AccessShareLock);
1465
1466                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1467                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1468                                                                          ObjectIdGetDatum(object->objectId));
1469
1470                                 rcscan = systable_beginscan(castDesc, CastOidIndex, true,
1471                                                                                         SnapshotNow, 1, skey);
1472
1473                                 tup = systable_getnext(rcscan);
1474
1475                                 if (!HeapTupleIsValid(tup))
1476                                         elog(ERROR, "could not find tuple for cast %u",
1477                                                  object->objectId);
1478
1479                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
1480
1481                                 appendStringInfo(&buffer, "cast from %s to %s",
1482                                                                  format_type_be(castForm->castsource),
1483                                                                  format_type_be(castForm->casttarget));
1484
1485                                 systable_endscan(rcscan);
1486                                 heap_close(castDesc, AccessShareLock);
1487                                 break;
1488                         }
1489
1490                 case OCLASS_CONSTRAINT:
1491                         {
1492                                 Relation        conDesc;
1493                                 ScanKeyData skey[1];
1494                                 SysScanDesc rcscan;
1495                                 HeapTuple       tup;
1496                                 Form_pg_constraint con;
1497
1498                                 conDesc = heap_openr(ConstraintRelationName, AccessShareLock);
1499
1500                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1501                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1502                                                                          ObjectIdGetDatum(object->objectId));
1503
1504                                 rcscan = systable_beginscan(conDesc, ConstraintOidIndex, true,
1505                                                                                         SnapshotNow, 1, skey);
1506
1507                                 tup = systable_getnext(rcscan);
1508
1509                                 if (!HeapTupleIsValid(tup))
1510                                         elog(ERROR, "could not find tuple for constraint %u",
1511                                                  object->objectId);
1512
1513                                 con = (Form_pg_constraint) GETSTRUCT(tup);
1514
1515                                 if (OidIsValid(con->conrelid))
1516                                 {
1517                                         appendStringInfo(&buffer, "constraint %s on ",
1518                                                                          NameStr(con->conname));
1519                                         getRelationDescription(&buffer, con->conrelid);
1520                                 }
1521                                 else
1522                                 {
1523                                         appendStringInfo(&buffer, "constraint %s",
1524                                                                          NameStr(con->conname));
1525                                 }
1526
1527                                 systable_endscan(rcscan);
1528                                 heap_close(conDesc, AccessShareLock);
1529                                 break;
1530                         }
1531
1532                 case OCLASS_CONVERSION:
1533                         {
1534                                 HeapTuple       conTup;
1535
1536                                 conTup = SearchSysCache(CONOID,
1537                                                                           ObjectIdGetDatum(object->objectId),
1538                                                                                 0, 0, 0);
1539                                 if (!HeapTupleIsValid(conTup))
1540                                         elog(ERROR, "cache lookup failed for conversion %u",
1541                                                  object->objectId);
1542                                 appendStringInfo(&buffer, "conversion %s",
1543                                                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
1544                                 ReleaseSysCache(conTup);
1545                                 break;
1546                         }
1547
1548                 case OCLASS_DEFAULT:
1549                         {
1550                                 Relation        attrdefDesc;
1551                                 ScanKeyData skey[1];
1552                                 SysScanDesc adscan;
1553                                 HeapTuple       tup;
1554                                 Form_pg_attrdef attrdef;
1555                                 ObjectAddress colobject;
1556
1557                                 attrdefDesc = heap_openr(AttrDefaultRelationName, AccessShareLock);
1558
1559                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1560                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1561                                                                          ObjectIdGetDatum(object->objectId));
1562
1563                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndex, true,
1564                                                                                         SnapshotNow, 1, skey);
1565
1566                                 tup = systable_getnext(adscan);
1567
1568                                 if (!HeapTupleIsValid(tup))
1569                                         elog(ERROR, "could not find tuple for attrdef %u",
1570                                                  object->objectId);
1571
1572                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
1573
1574                                 colobject.classId = RelOid_pg_class;
1575                                 colobject.objectId = attrdef->adrelid;
1576                                 colobject.objectSubId = attrdef->adnum;
1577
1578                                 appendStringInfo(&buffer, "default for %s",
1579                                                                  getObjectDescription(&colobject));
1580
1581                                 systable_endscan(adscan);
1582                                 heap_close(attrdefDesc, AccessShareLock);
1583                                 break;
1584                         }
1585
1586                 case OCLASS_LANGUAGE:
1587                         {
1588                                 HeapTuple       langTup;
1589
1590                                 langTup = SearchSysCache(LANGOID,
1591                                                                           ObjectIdGetDatum(object->objectId),
1592                                                                                  0, 0, 0);
1593                                 if (!HeapTupleIsValid(langTup))
1594                                         elog(ERROR, "cache lookup failed for language %u",
1595                                                  object->objectId);
1596                                 appendStringInfo(&buffer, "language %s",
1597                                                                  NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
1598                                 ReleaseSysCache(langTup);
1599                                 break;
1600                         }
1601
1602                 case OCLASS_OPERATOR:
1603                         appendStringInfo(&buffer, "operator %s",
1604                                                          format_operator(object->objectId));
1605                         break;
1606
1607                 case OCLASS_OPCLASS:
1608                         {
1609                                 HeapTuple       opcTup;
1610                                 Form_pg_opclass opcForm;
1611                                 HeapTuple       amTup;
1612                                 Form_pg_am      amForm;
1613                                 char       *nspname;
1614
1615                                 opcTup = SearchSysCache(CLAOID,
1616                                                                           ObjectIdGetDatum(object->objectId),
1617                                                                                 0, 0, 0);
1618                                 if (!HeapTupleIsValid(opcTup))
1619                                         elog(ERROR, "cache lookup failed for opclass %u",
1620                                                  object->objectId);
1621                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
1622
1623                                 /* Qualify the name if not visible in search path */
1624                                 if (OpclassIsVisible(object->objectId))
1625                                         nspname = NULL;
1626                                 else
1627                                         nspname = get_namespace_name(opcForm->opcnamespace);
1628
1629                                 appendStringInfo(&buffer, "operator class %s",
1630                                                                  quote_qualified_identifier(nspname,
1631                                                                                          NameStr(opcForm->opcname)));
1632
1633                                 amTup = SearchSysCache(AMOID,
1634                                                                            ObjectIdGetDatum(opcForm->opcamid),
1635                                                                            0, 0, 0);
1636                                 if (!HeapTupleIsValid(amTup))
1637                                         elog(ERROR, "cache lookup failed for access method %u",
1638                                                  opcForm->opcamid);
1639                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
1640
1641                                 appendStringInfo(&buffer, " for %s",
1642                                                                  NameStr(amForm->amname));
1643
1644                                 ReleaseSysCache(amTup);
1645                                 ReleaseSysCache(opcTup);
1646                                 break;
1647                         }
1648
1649                 case OCLASS_REWRITE:
1650                         {
1651                                 Relation        ruleDesc;
1652                                 ScanKeyData skey[1];
1653                                 SysScanDesc rcscan;
1654                                 HeapTuple       tup;
1655                                 Form_pg_rewrite rule;
1656
1657                                 ruleDesc = heap_openr(RewriteRelationName, AccessShareLock);
1658
1659                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1660                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1661                                                                          ObjectIdGetDatum(object->objectId));
1662
1663                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndex, true,
1664                                                                                         SnapshotNow, 1, skey);
1665
1666                                 tup = systable_getnext(rcscan);
1667
1668                                 if (!HeapTupleIsValid(tup))
1669                                         elog(ERROR, "could not find tuple for rule %u",
1670                                                  object->objectId);
1671
1672                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
1673
1674                                 appendStringInfo(&buffer, "rule %s on ",
1675                                                                  NameStr(rule->rulename));
1676                                 getRelationDescription(&buffer, rule->ev_class);
1677
1678                                 systable_endscan(rcscan);
1679                                 heap_close(ruleDesc, AccessShareLock);
1680                                 break;
1681                         }
1682
1683                 case OCLASS_TRIGGER:
1684                         {
1685                                 Relation        trigDesc;
1686                                 ScanKeyData skey[1];
1687                                 SysScanDesc tgscan;
1688                                 HeapTuple       tup;
1689                                 Form_pg_trigger trig;
1690
1691                                 trigDesc = heap_openr(TriggerRelationName, AccessShareLock);
1692
1693                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1694                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1695                                                                          ObjectIdGetDatum(object->objectId));
1696
1697                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndex, true,
1698                                                                                         SnapshotNow, 1, skey);
1699
1700                                 tup = systable_getnext(tgscan);
1701
1702                                 if (!HeapTupleIsValid(tup))
1703                                         elog(ERROR, "could not find tuple for trigger %u",
1704                                                  object->objectId);
1705
1706                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
1707
1708                                 appendStringInfo(&buffer, "trigger %s on ",
1709                                                                  NameStr(trig->tgname));
1710                                 getRelationDescription(&buffer, trig->tgrelid);
1711
1712                                 systable_endscan(tgscan);
1713                                 heap_close(trigDesc, AccessShareLock);
1714                                 break;
1715                         }
1716
1717                 case OCLASS_SCHEMA:
1718                         {
1719                                 char       *nspname;
1720
1721                                 nspname = get_namespace_name(object->objectId);
1722                                 if (!nspname)
1723                                         elog(ERROR, "cache lookup failed for namespace %u",
1724                                                  object->objectId);
1725                                 appendStringInfo(&buffer, "schema %s", nspname);
1726                                 break;
1727                         }
1728
1729                 default:
1730                         appendStringInfo(&buffer, "unrecognized object %u %u %d",
1731                                                          object->classId,
1732                                                          object->objectId,
1733                                                          object->objectSubId);
1734                         break;
1735         }
1736
1737         return buffer.data;
1738 }
1739
1740 /*
1741  * subroutine for getObjectDescription: describe a relation
1742  */
1743 static void
1744 getRelationDescription(StringInfo buffer, Oid relid)
1745 {
1746         HeapTuple       relTup;
1747         Form_pg_class relForm;
1748         char       *nspname;
1749         char       *relname;
1750
1751         relTup = SearchSysCache(RELOID,
1752                                                         ObjectIdGetDatum(relid),
1753                                                         0, 0, 0);
1754         if (!HeapTupleIsValid(relTup))
1755                 elog(ERROR, "cache lookup failed for relation %u", relid);
1756         relForm = (Form_pg_class) GETSTRUCT(relTup);
1757
1758         /* Qualify the name if not visible in search path */
1759         if (RelationIsVisible(relid))
1760                 nspname = NULL;
1761         else
1762                 nspname = get_namespace_name(relForm->relnamespace);
1763
1764         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
1765
1766         switch (relForm->relkind)
1767         {
1768                 case RELKIND_RELATION:
1769                         appendStringInfo(buffer, "table %s",
1770                                                          relname);
1771                         break;
1772                 case RELKIND_INDEX:
1773                         appendStringInfo(buffer, "index %s",
1774                                                          relname);
1775                         break;
1776                 case RELKIND_SPECIAL:
1777                         appendStringInfo(buffer, "special system relation %s",
1778                                                          relname);
1779                         break;
1780                 case RELKIND_SEQUENCE:
1781                         appendStringInfo(buffer, "sequence %s",
1782                                                          relname);
1783                         break;
1784                 case RELKIND_UNCATALOGED:
1785                         appendStringInfo(buffer, "uncataloged table %s",
1786                                                          relname);
1787                         break;
1788                 case RELKIND_TOASTVALUE:
1789                         appendStringInfo(buffer, "toast table %s",
1790                                                          relname);
1791                         break;
1792                 case RELKIND_VIEW:
1793                         appendStringInfo(buffer, "view %s",
1794                                                          relname);
1795                         break;
1796                 case RELKIND_COMPOSITE_TYPE:
1797                         appendStringInfo(buffer, "composite type %s",
1798                                                          relname);
1799                         break;
1800                 default:
1801                         /* shouldn't get here */
1802                         appendStringInfo(buffer, "relation %s",
1803                                                          relname);
1804                         break;
1805         }
1806
1807         ReleaseSysCache(relTup);
1808 }