]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
d64f027ef64a21a14440221d8353ebbeb431d28a
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.33 2003/11/12 21:15:48 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_attrdef.h"
26 #include "catalog/pg_cast.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_conversion.h"
29 #include "catalog/pg_depend.h"
30 #include "catalog/pg_language.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_rewrite.h"
33 #include "catalog/pg_trigger.h"
34 #include "commands/comment.h"
35 #include "commands/defrem.h"
36 #include "commands/proclang.h"
37 #include "commands/schemacmds.h"
38 #include "commands/trigger.h"
39 #include "commands/typecmds.h"
40 #include "lib/stringinfo.h"
41 #include "miscadmin.h"
42 #include "optimizer/clauses.h"
43 #include "parser/parsetree.h"
44 #include "rewrite/rewriteRemove.h"
45 #include "utils/builtins.h"
46 #include "utils/fmgroids.h"
47 #include "utils/lsyscache.h"
48 #include "utils/syscache.h"
49
50
51 /* This enum covers all system catalogs whose OIDs can appear in classid. */
52 typedef enum ObjectClasses
53 {
54         OCLASS_CLASS,                           /* pg_class */
55         OCLASS_PROC,                            /* pg_proc */
56         OCLASS_TYPE,                            /* pg_type */
57         OCLASS_CAST,                            /* pg_cast */
58         OCLASS_CONSTRAINT,                      /* pg_constraint */
59         OCLASS_CONVERSION,                      /* pg_conversion */
60         OCLASS_DEFAULT,                         /* pg_attrdef */
61         OCLASS_LANGUAGE,                        /* pg_language */
62         OCLASS_OPERATOR,                        /* pg_operator */
63         OCLASS_OPCLASS,                         /* pg_opclass */
64         OCLASS_REWRITE,                         /* pg_rewrite */
65         OCLASS_TRIGGER,                         /* pg_trigger */
66         OCLASS_SCHEMA,                          /* pg_namespace */
67         MAX_OCLASS                                      /* MUST BE LAST */
68 } ObjectClasses;
69
70 /* expansible list of ObjectAddresses */
71 typedef struct
72 {
73         ObjectAddress *refs;            /* => palloc'd array */
74         int                     numrefs;                /* current number of references */
75         int                     maxrefs;                /* current size of palloc'd array */
76 } ObjectAddresses;
77
78 /* for find_expr_references_walker */
79 typedef struct
80 {
81         ObjectAddresses addrs;          /* addresses being accumulated */
82         List       *rtables;            /* list of rangetables to resolve Vars */
83 } find_expr_references_context;
84
85
86 /*
87  * Because not all system catalogs have predetermined OIDs, we build a table
88  * mapping between ObjectClasses and OIDs.      This is done at most once per
89  * backend run, to minimize lookup overhead.
90  */
91 static bool object_classes_initialized = false;
92 static Oid      object_classes[MAX_OCLASS];
93
94
95 static void findAutoDeletableObjects(const ObjectAddress *object,
96                                                  ObjectAddresses *oktodelete,
97                                                  Relation depRel);
98 static bool recursiveDeletion(const ObjectAddress *object,
99                                   DropBehavior behavior,
100                                   int msglevel,
101                                   const ObjectAddress *callingObject,
102                                   ObjectAddresses *oktodelete,
103                                   Relation depRel);
104 static bool deleteDependentObjects(const ObjectAddress *object,
105                                            const char *objDescription,
106                                            DropBehavior behavior,
107                                            int msglevel,
108                                            ObjectAddresses *oktodelete,
109                                            Relation depRel);
110 static void doDeletion(const ObjectAddress *object);
111 static bool find_expr_references_walker(Node *node,
112                                                         find_expr_references_context *context);
113 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
114 static int      object_address_comparator(const void *a, const void *b);
115 static void init_object_addresses(ObjectAddresses *addrs);
116 static void add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
117                                    ObjectAddresses *addrs);
118 static void add_exact_object_address(const ObjectAddress *object,
119                                                  ObjectAddresses *addrs);
120 static bool object_address_present(const ObjectAddress *object,
121                                            ObjectAddresses *addrs);
122 static void term_object_addresses(ObjectAddresses *addrs);
123 static void init_object_classes(void);
124 static ObjectClasses getObjectClass(const ObjectAddress *object);
125 static char *getObjectDescription(const ObjectAddress *object);
126 static void getRelationDescription(StringInfo buffer, Oid relid);
127
128
129 /*
130  * performDeletion: attempt to drop the specified object.  If CASCADE
131  * behavior is specified, also drop any dependent objects (recursively).
132  * If RESTRICT behavior is specified, error out if there are any dependent
133  * objects, except for those that should be implicitly dropped anyway
134  * according to the dependency type.
135  *
136  * This is the outer control routine for all forms of DROP that drop objects
137  * that can participate in dependencies.
138  */
139 void
140 performDeletion(const ObjectAddress *object,
141                                 DropBehavior behavior)
142 {
143         char       *objDescription;
144         Relation        depRel;
145         ObjectAddresses oktodelete;
146
147         /*
148          * Get object description for possible use in failure message. Must do
149          * this before deleting it ...
150          */
151         objDescription = getObjectDescription(object);
152
153         /*
154          * We save some cycles by opening pg_depend just once and passing the
155          * Relation pointer down to all the recursive deletion steps.
156          */
157         depRel = heap_openr(DependRelationName, RowExclusiveLock);
158
159         /*
160          * Construct a list of objects that are reachable by AUTO or INTERNAL
161          * dependencies from the target object.  These should be deleted
162          * silently, even if the actual deletion pass first reaches one of
163          * them via a non-auto dependency.
164          */
165         init_object_addresses(&oktodelete);
166
167         findAutoDeletableObjects(object, &oktodelete, depRel);
168
169         if (!recursiveDeletion(object, behavior, NOTICE,
170                                                    NULL, &oktodelete, depRel))
171                 ereport(ERROR,
172                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
173                           errmsg("cannot drop %s because other objects depend on it",
174                                          objDescription),
175                                  errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
176
177         term_object_addresses(&oktodelete);
178
179         heap_close(depRel, RowExclusiveLock);
180
181         pfree(objDescription);
182 }
183
184
185 /*
186  * deleteWhatDependsOn: attempt to drop everything that depends on the
187  * specified object, though not the object itself.      Behavior is always
188  * CASCADE.
189  *
190  * This is currently used only to clean out the contents of a schema
191  * (namespace): the passed object is a namespace.  We normally want this
192  * to be done silently, so there's an option to suppress NOTICE messages.
193  */
194 void
195 deleteWhatDependsOn(const ObjectAddress *object,
196                                         bool showNotices)
197 {
198         char       *objDescription;
199         Relation        depRel;
200         ObjectAddresses oktodelete;
201
202         /*
203          * Get object description for possible use in failure messages
204          */
205         objDescription = getObjectDescription(object);
206
207         /*
208          * We save some cycles by opening pg_depend just once and passing the
209          * Relation pointer down to all the recursive deletion steps.
210          */
211         depRel = heap_openr(DependRelationName, RowExclusiveLock);
212
213         /*
214          * Construct a list of objects that are reachable by AUTO or INTERNAL
215          * dependencies from the target object.  These should be deleted
216          * silently, even if the actual deletion pass first reaches one of
217          * them via a non-auto dependency.
218          */
219         init_object_addresses(&oktodelete);
220
221         findAutoDeletableObjects(object, &oktodelete, depRel);
222
223         /*
224          * Now invoke only step 2 of recursiveDeletion: just recurse to the
225          * stuff dependent on the given object.
226          */
227         if (!deleteDependentObjects(object, objDescription,
228                                                                 DROP_CASCADE,
229                                                                 showNotices ? NOTICE : DEBUG2,
230                                                                 &oktodelete, depRel))
231                 ereport(ERROR,
232                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
233                                  errmsg("failed to drop all objects depending on %s",
234                                                 objDescription)));
235
236         /*
237          * We do not need CommandCounterIncrement here, since if step 2 did
238          * anything then each recursive call will have ended with one.
239          */
240
241         term_object_addresses(&oktodelete);
242
243         heap_close(depRel, RowExclusiveLock);
244
245         pfree(objDescription);
246 }
247
248
249 /*
250  * findAutoDeletableObjects: find all objects that are reachable by AUTO or
251  * INTERNAL dependency paths from the given object.  Add them all to the
252  * oktodelete list.  Note that the originally given object will also be
253  * added to the list.
254  *
255  * depRel is the already-open pg_depend relation.
256  */
257 static void
258 findAutoDeletableObjects(const ObjectAddress *object,
259                                                  ObjectAddresses *oktodelete,
260                                                  Relation depRel)
261 {
262         ScanKeyData key[3];
263         int                     nkeys;
264         SysScanDesc scan;
265         HeapTuple       tup;
266         ObjectAddress otherObject;
267
268         /*
269          * If this object is already in oktodelete, then we already visited
270          * it; don't do so again (this prevents infinite recursion if there's
271          * a loop in pg_depend).  Otherwise, add it.
272          */
273         if (object_address_present(object, oktodelete))
274                 return;
275         add_exact_object_address(object, oktodelete);
276
277         /*
278          * Scan pg_depend records that link to this object, showing the things
279          * that depend on it.  For each one that is AUTO or INTERNAL, visit
280          * the referencing object.
281          *
282          * When dropping a whole object (subId = 0), find pg_depend records for
283          * its sub-objects too.
284          */
285         ScanKeyInit(&key[0],
286                                 Anum_pg_depend_refclassid,
287                                 BTEqualStrategyNumber, F_OIDEQ,
288                                 ObjectIdGetDatum(object->classId));
289         ScanKeyInit(&key[1],
290                                 Anum_pg_depend_refobjid,
291                                 BTEqualStrategyNumber, F_OIDEQ,
292                                 ObjectIdGetDatum(object->objectId));
293         if (object->objectSubId != 0)
294         {
295                 ScanKeyInit(&key[2],
296                                         Anum_pg_depend_refobjsubid,
297                                         BTEqualStrategyNumber, F_INT4EQ,
298                                         Int32GetDatum(object->objectSubId));
299                 nkeys = 3;
300         }
301         else
302                 nkeys = 2;
303
304         scan = systable_beginscan(depRel, DependReferenceIndex, true,
305                                                           SnapshotNow, nkeys, key);
306
307         while (HeapTupleIsValid(tup = systable_getnext(scan)))
308         {
309                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
310
311                 switch (foundDep->deptype)
312                 {
313                         case DEPENDENCY_NORMAL:
314                                 /* ignore */
315                                 break;
316                         case DEPENDENCY_AUTO:
317                         case DEPENDENCY_INTERNAL:
318                                 /* recurse */
319                                 otherObject.classId = foundDep->classid;
320                                 otherObject.objectId = foundDep->objid;
321                                 otherObject.objectSubId = foundDep->objsubid;
322                                 findAutoDeletableObjects(&otherObject, oktodelete, depRel);
323                                 break;
324                         case DEPENDENCY_PIN:
325
326                                 /*
327                                  * For a PIN dependency we just ereport immediately; there
328                                  * won't be any others to examine, and we aren't ever
329                                  * going to let the user delete it.
330                                  */
331                                 ereport(ERROR,
332                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
333                                                  errmsg("cannot drop %s because it is required by the database system",
334                                                                 getObjectDescription(object))));
335                                 break;
336                         default:
337                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
338                                          foundDep->deptype, getObjectDescription(object));
339                                 break;
340                 }
341         }
342
343         systable_endscan(scan);
344 }
345
346
347 /*
348  * recursiveDeletion: delete a single object for performDeletion, plus
349  * (recursively) anything that depends on it.
350  *
351  * Returns TRUE if successful, FALSE if not.
352  *
353  * callingObject is NULL at the outer level, else identifies the object that
354  * we recursed from (the reference object that someone else needs to delete).
355  *
356  * oktodelete is a list of objects verified deletable (ie, reachable by one
357  * or more AUTO or INTERNAL dependencies from the original target).
358  *
359  * depRel is the already-open pg_depend relation.
360  *
361  *
362  * In RESTRICT mode, we perform all the deletions anyway, but ereport a message
363  * and return FALSE if we find a restriction violation.  performDeletion
364  * will then abort the transaction to nullify the deletions.  We have to
365  * do it this way to (a) report all the direct and indirect dependencies
366  * while (b) not going into infinite recursion if there's a cycle.
367  *
368  * This is even more complex than one could wish, because it is possible for
369  * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
370  * dependencies.  Also, we might have a situation where we've been asked to
371  * delete object A, and objects B and C both have AUTO dependencies on A,
372  * but B also has a NORMAL dependency on C.  (Since any of these paths might
373  * be indirect, we can't prevent these scenarios, but must cope instead.)
374  * If we visit C before B then we would mistakenly decide that the B->C link
375  * should prevent the restricted drop from occurring.  To handle this, we make
376  * a pre-scan to find all the objects that are auto-deletable from A.  If we
377  * visit C first, but B is present in the oktodelete list, then we make no
378  * complaint but recurse to delete B anyway.  (Note that in general we must
379  * delete B before deleting C; the drop routine for B may try to access C.)
380  *
381  * Note: in the case where the path to B is traversed first, we will not
382  * see the NORMAL dependency when we reach C, because of the pg_depend
383  * removals done in step 1.  The oktodelete list is necessary just
384  * to make the behavior independent of the order in which pg_depend
385  * entries are visited.
386  */
387 static bool
388 recursiveDeletion(const ObjectAddress *object,
389                                   DropBehavior behavior,
390                                   int msglevel,
391                                   const ObjectAddress *callingObject,
392                                   ObjectAddresses *oktodelete,
393                                   Relation depRel)
394 {
395         bool            ok = true;
396         char       *objDescription;
397         ScanKeyData key[3];
398         int                     nkeys;
399         SysScanDesc scan;
400         HeapTuple       tup;
401         ObjectAddress otherObject;
402         ObjectAddress owningObject;
403         bool            amOwned = false;
404
405         /*
406          * Get object description for possible use in messages.  Must do this
407          * before deleting it ...
408          */
409         objDescription = getObjectDescription(object);
410
411         /*
412          * Step 1: find and remove pg_depend records that link from this
413          * object to others.  We have to do this anyway, and doing it first
414          * ensures that we avoid infinite recursion in the case of cycles.
415          * Also, some dependency types require extra processing here.
416          *
417          * When dropping a whole object (subId = 0), remove all pg_depend records
418          * for its sub-objects too.
419          */
420         ScanKeyInit(&key[0],
421                                 Anum_pg_depend_classid,
422                                 BTEqualStrategyNumber, F_OIDEQ,
423                                 ObjectIdGetDatum(object->classId));
424         ScanKeyInit(&key[1],
425                                 Anum_pg_depend_objid,
426                                 BTEqualStrategyNumber, F_OIDEQ,
427                                 ObjectIdGetDatum(object->objectId));
428         if (object->objectSubId != 0)
429         {
430                 ScanKeyInit(&key[2],
431                                         Anum_pg_depend_objsubid,
432                                         BTEqualStrategyNumber, F_INT4EQ,
433                                         Int32GetDatum(object->objectSubId));
434                 nkeys = 3;
435         }
436         else
437                 nkeys = 2;
438
439         scan = systable_beginscan(depRel, DependDependerIndex, true,
440                                                           SnapshotNow, nkeys, key);
441
442         while (HeapTupleIsValid(tup = systable_getnext(scan)))
443         {
444                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
445
446                 otherObject.classId = foundDep->refclassid;
447                 otherObject.objectId = foundDep->refobjid;
448                 otherObject.objectSubId = foundDep->refobjsubid;
449
450                 switch (foundDep->deptype)
451                 {
452                         case DEPENDENCY_NORMAL:
453                         case DEPENDENCY_AUTO:
454                                 /* no problem */
455                                 break;
456                         case DEPENDENCY_INTERNAL:
457
458                                 /*
459                                  * This object is part of the internal implementation of
460                                  * another object.      We have three cases:
461                                  *
462                                  * 1. At the outermost recursion level, disallow the DROP.
463                                  * (We just ereport here, rather than proceeding, since no
464                                  * other dependencies are likely to be interesting.)
465                                  */
466                                 if (callingObject == NULL)
467                                 {
468                                         char       *otherObjDesc = getObjectDescription(&otherObject);
469
470                                         ereport(ERROR,
471                                                  (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
472                                                   errmsg("cannot drop %s because %s requires it",
473                                                                  objDescription, otherObjDesc),
474                                                   errhint("You may drop %s instead.",
475                                                                   otherObjDesc)));
476                                 }
477
478                                 /*
479                                  * 2. When recursing from the other end of this
480                                  * dependency, it's okay to continue with the deletion.
481                                  * This holds when recursing from a whole object that
482                                  * includes the nominal other end as a component, too.
483                                  */
484                                 if (callingObject->classId == otherObject.classId &&
485                                         callingObject->objectId == otherObject.objectId &&
486                                 (callingObject->objectSubId == otherObject.objectSubId ||
487                                  callingObject->objectSubId == 0))
488                                         break;
489
490                                 /*
491                                  * 3. When recursing from anyplace else, transform this
492                                  * deletion request into a delete of the other object.
493                                  * (This will be an error condition iff RESTRICT mode.) In
494                                  * this case we finish deleting my dependencies except for
495                                  * the INTERNAL link, which will be needed to cause the
496                                  * owning object to recurse back to me.
497                                  */
498                                 if (amOwned)    /* shouldn't happen */
499                                         elog(ERROR, "multiple INTERNAL dependencies for %s",
500                                                  objDescription);
501                                 owningObject = otherObject;
502                                 amOwned = true;
503                                 /* "continue" bypasses the simple_heap_delete call below */
504                                 continue;
505                         case DEPENDENCY_PIN:
506
507                                 /*
508                                  * Should not happen; PIN dependencies should have zeroes
509                                  * in the depender fields...
510                                  */
511                                 elog(ERROR, "incorrect use of PIN dependency with %s",
512                                          objDescription);
513                                 break;
514                         default:
515                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
516                                          foundDep->deptype, objDescription);
517                                 break;
518                 }
519
520                 simple_heap_delete(depRel, &tup->t_self);
521         }
522
523         systable_endscan(scan);
524
525         /*
526          * CommandCounterIncrement here to ensure that preceding changes are
527          * all visible; in particular, that the above deletions of pg_depend
528          * entries are visible.  That prevents infinite recursion in case of a
529          * dependency loop (which is perfectly legal).
530          */
531         CommandCounterIncrement();
532
533         /*
534          * If we found we are owned by another object, ask it to delete itself
535          * instead of proceeding.  Complain if RESTRICT mode, unless the other
536          * object is in oktodelete.
537          */
538         if (amOwned)
539         {
540                 if (object_address_present(&owningObject, oktodelete))
541                         ereport(DEBUG2,
542                                         (errmsg("drop auto-cascades to %s",
543                                                         getObjectDescription(&owningObject))));
544                 else if (behavior == DROP_RESTRICT)
545                 {
546                         ereport(msglevel,
547                                         (errmsg("%s depends on %s",
548                                                         getObjectDescription(&owningObject),
549                                                         objDescription)));
550                         ok = false;
551                 }
552                 else
553                         ereport(msglevel,
554                                         (errmsg("drop cascades to %s",
555                                                         getObjectDescription(&owningObject))));
556
557                 if (!recursiveDeletion(&owningObject, behavior, msglevel,
558                                                            object, oktodelete, depRel))
559                         ok = false;
560
561                 pfree(objDescription);
562
563                 return ok;
564         }
565
566         /*
567          * Step 2: scan pg_depend records that link to this object, showing
568          * the things that depend on it.  Recursively delete those things.
569          * Note it's important to delete the dependent objects before the
570          * referenced one, since the deletion routines might do things like
571          * try to update the pg_class record when deleting a check constraint.
572          */
573         if (!deleteDependentObjects(object, objDescription,
574                                                                 behavior, msglevel,
575                                                                 oktodelete, depRel))
576                 ok = false;
577
578         /*
579          * We do not need CommandCounterIncrement here, since if step 2 did
580          * anything then each recursive call will have ended with one.
581          */
582
583         /*
584          * Step 3: delete the object itself.
585          */
586         doDeletion(object);
587
588         /*
589          * Delete any comments associated with this object.  (This is a
590          * convenient place to do it instead of having every object type know
591          * to do it.)
592          */
593         DeleteComments(object->objectId, object->classId, object->objectSubId);
594
595         /*
596          * CommandCounterIncrement here to ensure that preceding changes are
597          * all visible.
598          */
599         CommandCounterIncrement();
600
601         /*
602          * And we're done!
603          */
604         pfree(objDescription);
605
606         return ok;
607 }
608
609
610 /*
611  * deleteDependentObjects - find and delete objects that depend on 'object'
612  *
613  * Scan pg_depend records that link to the given object, showing
614  * the things that depend on it.  Recursively delete those things. (We
615  * don't delete the pg_depend records here, as the recursive call will
616  * do that.)  Note it's important to delete the dependent objects
617  * before the referenced one, since the deletion routines might do
618  * things like try to update the pg_class record when deleting a check
619  * constraint.
620  *
621  * When dropping a whole object (subId = 0), find pg_depend records for
622  * its sub-objects too.
623  *
624  *      object: the object to find dependencies on
625  *      objDescription: description of object (only used for error messages)
626  *      behavior: desired drop behavior
627  *      oktodelete: stuff that's AUTO-deletable
628  *      depRel: already opened pg_depend relation
629  *
630  * Returns TRUE if all is well, false if any problem found.
631  *
632  * NOTE: because we are using SnapshotNow, if a recursive call deletes
633  * any pg_depend tuples that our scan hasn't yet visited, we will not
634  * see them as good when we do visit them.      This is essential for
635  * correct behavior if there are multiple dependency paths between two
636  * objects --- else we might try to delete an already-deleted object.
637  */
638 static bool
639 deleteDependentObjects(const ObjectAddress *object,
640                                            const char *objDescription,
641                                            DropBehavior behavior,
642                                            int msglevel,
643                                            ObjectAddresses *oktodelete,
644                                            Relation depRel)
645 {
646         bool            ok = true;
647         ScanKeyData key[3];
648         int                     nkeys;
649         SysScanDesc scan;
650         HeapTuple       tup;
651         ObjectAddress otherObject;
652
653         ScanKeyInit(&key[0],
654                                 Anum_pg_depend_refclassid,
655                                 BTEqualStrategyNumber, F_OIDEQ,
656                                 ObjectIdGetDatum(object->classId));
657         ScanKeyInit(&key[1],
658                                 Anum_pg_depend_refobjid,
659                                 BTEqualStrategyNumber, F_OIDEQ,
660                                 ObjectIdGetDatum(object->objectId));
661         if (object->objectSubId != 0)
662         {
663                 ScanKeyInit(&key[2],
664                                         Anum_pg_depend_refobjsubid,
665                                         BTEqualStrategyNumber, F_INT4EQ,
666                                         Int32GetDatum(object->objectSubId));
667                 nkeys = 3;
668         }
669         else
670                 nkeys = 2;
671
672         scan = systable_beginscan(depRel, DependReferenceIndex, true,
673                                                           SnapshotNow, nkeys, key);
674
675         while (HeapTupleIsValid(tup = systable_getnext(scan)))
676         {
677                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
678
679                 otherObject.classId = foundDep->classid;
680                 otherObject.objectId = foundDep->objid;
681                 otherObject.objectSubId = foundDep->objsubid;
682
683                 switch (foundDep->deptype)
684                 {
685                         case DEPENDENCY_NORMAL:
686
687                                 /*
688                                  * Perhaps there was another dependency path that would
689                                  * have allowed silent deletion of the otherObject, had we
690                                  * only taken that path first. In that case, act like this
691                                  * link is AUTO, too.
692                                  */
693                                 if (object_address_present(&otherObject, oktodelete))
694                                         ereport(DEBUG2,
695                                                         (errmsg("drop auto-cascades to %s",
696                                                                         getObjectDescription(&otherObject))));
697                                 else if (behavior == DROP_RESTRICT)
698                                 {
699                                         ereport(msglevel,
700                                                         (errmsg("%s depends on %s",
701                                                                         getObjectDescription(&otherObject),
702                                                                         objDescription)));
703                                         ok = false;
704                                 }
705                                 else
706                                         ereport(msglevel,
707                                                         (errmsg("drop cascades to %s",
708                                                                         getObjectDescription(&otherObject))));
709
710                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
711                                                                            object, oktodelete, depRel))
712                                         ok = false;
713                                 break;
714                         case DEPENDENCY_AUTO:
715                         case DEPENDENCY_INTERNAL:
716
717                                 /*
718                                  * We propagate the DROP without complaint even in the
719                                  * RESTRICT case.  (However, normal dependencies on the
720                                  * component object could still cause failure.)
721                                  */
722                                 ereport(DEBUG2,
723                                                 (errmsg("drop auto-cascades to %s",
724                                                                 getObjectDescription(&otherObject))));
725
726                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
727                                                                            object, oktodelete, depRel))
728                                         ok = false;
729                                 break;
730                         case DEPENDENCY_PIN:
731
732                                 /*
733                                  * For a PIN dependency we just ereport immediately; there
734                                  * won't be any others to report.
735                                  */
736                                 ereport(ERROR,
737                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
738                                                  errmsg("cannot drop %s because it is required by the database system",
739                                                                 objDescription)));
740                                 break;
741                         default:
742                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
743                                          foundDep->deptype, objDescription);
744                                 break;
745                 }
746         }
747
748         systable_endscan(scan);
749
750         return ok;
751 }
752
753
754 /*
755  * doDeletion: actually delete a single object
756  */
757 static void
758 doDeletion(const ObjectAddress *object)
759 {
760         switch (getObjectClass(object))
761         {
762                 case OCLASS_CLASS:
763                         {
764                                 char            relKind = get_rel_relkind(object->objectId);
765
766                                 if (relKind == RELKIND_INDEX)
767                                 {
768                                         Assert(object->objectSubId == 0);
769                                         index_drop(object->objectId);
770                                 }
771                                 else
772                                 {
773                                         if (object->objectSubId != 0)
774                                                 RemoveAttributeById(object->objectId,
775                                                                                         object->objectSubId);
776                                         else
777                                                 heap_drop_with_catalog(object->objectId);
778                                 }
779                                 break;
780                         }
781
782                 case OCLASS_PROC:
783                         RemoveFunctionById(object->objectId);
784                         break;
785
786                 case OCLASS_TYPE:
787                         RemoveTypeById(object->objectId);
788                         break;
789
790                 case OCLASS_CAST:
791                         DropCastById(object->objectId);
792                         break;
793
794                 case OCLASS_CONSTRAINT:
795                         RemoveConstraintById(object->objectId);
796                         break;
797
798                 case OCLASS_CONVERSION:
799                         RemoveConversionById(object->objectId);
800                         break;
801
802                 case OCLASS_DEFAULT:
803                         RemoveAttrDefaultById(object->objectId);
804                         break;
805
806                 case OCLASS_LANGUAGE:
807                         DropProceduralLanguageById(object->objectId);
808                         break;
809
810                 case OCLASS_OPERATOR:
811                         RemoveOperatorById(object->objectId);
812                         break;
813
814                 case OCLASS_OPCLASS:
815                         RemoveOpClassById(object->objectId);
816                         break;
817
818                 case OCLASS_REWRITE:
819                         RemoveRewriteRuleById(object->objectId);
820                         break;
821
822                 case OCLASS_TRIGGER:
823                         RemoveTriggerById(object->objectId);
824                         break;
825
826                 case OCLASS_SCHEMA:
827                         RemoveSchemaById(object->objectId);
828                         break;
829
830                 default:
831                         elog(ERROR, "unrecognized object class: %u",
832                                  object->classId);
833         }
834 }
835
836 /*
837  * recordDependencyOnExpr - find expression dependencies
838  *
839  * This is used to find the dependencies of rules, constraint expressions,
840  * etc.
841  *
842  * Given an expression or query in node-tree form, find all the objects
843  * it refers to (tables, columns, operators, functions, etc).  Record
844  * a dependency of the specified type from the given depender object
845  * to each object mentioned in the expression.
846  *
847  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
848  * It can be NIL if no such variables are expected.
849  *
850  * XXX is it important to create dependencies on the datatypes mentioned in
851  * the expression?      In most cases this would be redundant (eg, a ref to an
852  * operator indirectly references its input and output datatypes), but I'm
853  * not quite convinced there are no cases where we need it.
854  */
855 void
856 recordDependencyOnExpr(const ObjectAddress *depender,
857                                            Node *expr, List *rtable,
858                                            DependencyType behavior)
859 {
860         find_expr_references_context context;
861
862         init_object_addresses(&context.addrs);
863
864         /* Set up interpretation for Vars at varlevelsup = 0 */
865         context.rtables = makeList1(rtable);
866
867         /* Scan the expression tree for referenceable objects */
868         find_expr_references_walker(expr, &context);
869
870         /* Remove any duplicates */
871         eliminate_duplicate_dependencies(&context.addrs);
872
873         /* And record 'em */
874         recordMultipleDependencies(depender,
875                                                            context.addrs.refs, context.addrs.numrefs,
876                                                            behavior);
877
878         term_object_addresses(&context.addrs);
879 }
880
881 /*
882  * recordDependencyOnSingleRelExpr - find expression dependencies
883  *
884  * As above, but only one relation is expected to be referenced (with
885  * varno = 1 and varlevelsup = 0).      Pass the relation OID instead of a
886  * range table.  An additional frammish is that dependencies on that
887  * relation (or its component columns) will be marked with 'self_behavior',
888  * whereas 'behavior' is used for everything else.
889  */
890 void
891 recordDependencyOnSingleRelExpr(const ObjectAddress *depender,
892                                                                 Node *expr, Oid relId,
893                                                                 DependencyType behavior,
894                                                                 DependencyType self_behavior)
895 {
896         find_expr_references_context context;
897         RangeTblEntry rte;
898
899         init_object_addresses(&context.addrs);
900
901         /* We gin up a rather bogus rangetable list to handle Vars */
902         MemSet(&rte, 0, sizeof(rte));
903         rte.type = T_RangeTblEntry;
904         rte.rtekind = RTE_RELATION;
905         rte.relid = relId;
906
907         context.rtables = makeList1(makeList1(&rte));
908
909         /* Scan the expression tree for referenceable objects */
910         find_expr_references_walker(expr, &context);
911
912         /* Remove any duplicates */
913         eliminate_duplicate_dependencies(&context.addrs);
914
915         /* Separate self-dependencies if necessary */
916         if (behavior != self_behavior && context.addrs.numrefs > 0)
917         {
918                 ObjectAddresses self_addrs;
919                 ObjectAddress *outobj;
920                 int                     oldref,
921                                         outrefs;
922
923                 init_object_addresses(&self_addrs);
924
925                 outobj = context.addrs.refs;
926                 outrefs = 0;
927                 for (oldref = 0; oldref < context.addrs.numrefs; oldref++)
928                 {
929                         ObjectAddress *thisobj = context.addrs.refs + oldref;
930
931                         if (thisobj->classId == RelOid_pg_class &&
932                                 thisobj->objectId == relId)
933                         {
934                                 /* Move this ref into self_addrs */
935                                 add_object_address(OCLASS_CLASS, relId, thisobj->objectSubId,
936                                                                    &self_addrs);
937                         }
938                         else
939                         {
940                                 /* Keep it in context.addrs */
941                                 outobj->classId = thisobj->classId;
942                                 outobj->objectId = thisobj->objectId;
943                                 outobj->objectSubId = thisobj->objectSubId;
944                                 outobj++;
945                                 outrefs++;
946                         }
947                 }
948                 context.addrs.numrefs = outrefs;
949
950                 /* Record the self-dependencies */
951                 recordMultipleDependencies(depender,
952                                                                    self_addrs.refs, self_addrs.numrefs,
953                                                                    self_behavior);
954
955                 term_object_addresses(&self_addrs);
956         }
957
958         /* Record the external dependencies */
959         recordMultipleDependencies(depender,
960                                                            context.addrs.refs, context.addrs.numrefs,
961                                                            behavior);
962
963         term_object_addresses(&context.addrs);
964 }
965
966 /*
967  * Recursively search an expression tree for object references.
968  *
969  * Note: we avoid creating references to columns of tables that participate
970  * in an SQL JOIN construct, but are not actually used anywhere in the query.
971  * To do so, we do not scan the joinaliasvars list of a join RTE while
972  * scanning the query rangetable, but instead scan each individual entry
973  * of the alias list when we find a reference to it.
974  */
975 static bool
976 find_expr_references_walker(Node *node,
977                                                         find_expr_references_context *context)
978 {
979         if (node == NULL)
980                 return false;
981         if (IsA(node, Var))
982         {
983                 Var                *var = (Var *) node;
984                 int                     levelsup;
985                 List       *rtable,
986                                    *rtables;
987                 RangeTblEntry *rte;
988
989                 /* Find matching rtable entry, or complain if not found */
990                 levelsup = var->varlevelsup;
991                 rtables = context->rtables;
992                 while (levelsup--)
993                 {
994                         if (rtables == NIL)
995                                 break;
996                         rtables = lnext(rtables);
997                 }
998                 if (rtables == NIL)
999                         elog(ERROR, "invalid varlevelsup %d", var->varlevelsup);
1000                 rtable = lfirst(rtables);
1001                 if (var->varno <= 0 || var->varno > length(rtable))
1002                         elog(ERROR, "invalid varno %d", var->varno);
1003                 rte = rt_fetch(var->varno, rtable);
1004                 if (rte->rtekind == RTE_RELATION)
1005                 {
1006                         /* If it's a plain relation, reference this column */
1007                         /* NB: this code works for whole-row Var with attno 0, too */
1008                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
1009                                                            &context->addrs);
1010                 }
1011                 else if (rte->rtekind == RTE_JOIN)
1012                 {
1013                         /* Scan join output column to add references to join inputs */
1014                         List       *save_rtables;
1015
1016                         /* We must make the context appropriate for join's level */
1017                         save_rtables = context->rtables;
1018                         context->rtables = rtables;
1019                         if (var->varattno <= 0 ||
1020                                 var->varattno > length(rte->joinaliasvars))
1021                                 elog(ERROR, "invalid varattno %d", var->varattno);
1022                         find_expr_references_walker((Node *) nth(var->varattno - 1,
1023                                                                                                          rte->joinaliasvars),
1024                                                                                 context);
1025                         context->rtables = save_rtables;
1026                 }
1027                 return false;
1028         }
1029         if (IsA(node, FuncExpr))
1030         {
1031                 FuncExpr   *funcexpr = (FuncExpr *) node;
1032
1033                 add_object_address(OCLASS_PROC, funcexpr->funcid, 0,
1034                                                    &context->addrs);
1035                 /* fall through to examine arguments */
1036         }
1037         if (IsA(node, OpExpr))
1038         {
1039                 OpExpr     *opexpr = (OpExpr *) node;
1040
1041                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1042                                                    &context->addrs);
1043                 /* fall through to examine arguments */
1044         }
1045         if (IsA(node, DistinctExpr))
1046         {
1047                 DistinctExpr *distinctexpr = (DistinctExpr *) node;
1048
1049                 add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0,
1050                                                    &context->addrs);
1051                 /* fall through to examine arguments */
1052         }
1053         if (IsA(node, ScalarArrayOpExpr))
1054         {
1055                 ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;
1056
1057                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1058                                                    &context->addrs);
1059                 /* fall through to examine arguments */
1060         }
1061         if (IsA(node, NullIfExpr))
1062         {
1063                 NullIfExpr *nullifexpr = (NullIfExpr *) node;
1064
1065                 add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0,
1066                                                    &context->addrs);
1067                 /* fall through to examine arguments */
1068         }
1069         if (IsA(node, Aggref))
1070         {
1071                 Aggref     *aggref = (Aggref *) node;
1072
1073                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
1074                                                    &context->addrs);
1075                 /* fall through to examine arguments */
1076         }
1077         if (IsA(node, SubLink))
1078         {
1079                 SubLink    *sublink = (SubLink *) node;
1080                 List       *opid;
1081
1082                 foreach(opid, sublink->operOids)
1083                 {
1084                         add_object_address(OCLASS_OPERATOR, lfirsto(opid), 0,
1085                                                            &context->addrs);
1086                 }
1087                 /* fall through to examine arguments */
1088         }
1089         if (is_subplan(node))
1090         {
1091                 /* Extra work needed here if we ever need this case */
1092                 elog(ERROR, "already-planned subqueries not supported");
1093         }
1094         if (IsA(node, Query))
1095         {
1096                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
1097                 Query      *query = (Query *) node;
1098                 List       *rtable;
1099                 bool            result;
1100
1101                 /*
1102                  * Add whole-relation refs for each plain relation mentioned in
1103                  * the subquery's rtable.  (Note: query_tree_walker takes care of
1104                  * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need
1105                  * to do that here.  But keep it from looking at join alias
1106                  * lists.)
1107                  */
1108                 foreach(rtable, query->rtable)
1109                 {
1110                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
1111
1112                         if (rte->rtekind == RTE_RELATION)
1113                                 add_object_address(OCLASS_CLASS, rte->relid, 0,
1114                                                                    &context->addrs);
1115                 }
1116
1117                 /* Examine substructure of query */
1118                 context->rtables = lcons(query->rtable, context->rtables);
1119                 result = query_tree_walker(query,
1120                                                                    find_expr_references_walker,
1121                                                                    (void *) context,
1122                                                                    QTW_IGNORE_JOINALIASES);
1123                 context->rtables = lnext(context->rtables);
1124                 return result;
1125         }
1126         return expression_tree_walker(node, find_expr_references_walker,
1127                                                                   (void *) context);
1128 }
1129
1130 /*
1131  * Given an array of dependency references, eliminate any duplicates.
1132  */
1133 static void
1134 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
1135 {
1136         ObjectAddress *priorobj;
1137         int                     oldref,
1138                                 newrefs;
1139
1140         if (addrs->numrefs <= 1)
1141                 return;                                 /* nothing to do */
1142
1143         /* Sort the refs so that duplicates are adjacent */
1144         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
1145                   object_address_comparator);
1146
1147         /* Remove dups */
1148         priorobj = addrs->refs;
1149         newrefs = 1;
1150         for (oldref = 1; oldref < addrs->numrefs; oldref++)
1151         {
1152                 ObjectAddress *thisobj = addrs->refs + oldref;
1153
1154                 if (priorobj->classId == thisobj->classId &&
1155                         priorobj->objectId == thisobj->objectId)
1156                 {
1157                         if (priorobj->objectSubId == thisobj->objectSubId)
1158                                 continue;               /* identical, so drop thisobj */
1159
1160                         /*
1161                          * If we have a whole-object reference and a reference to a
1162                          * part of the same object, we don't need the whole-object
1163                          * reference (for example, we don't need to reference both
1164                          * table foo and column foo.bar).  The whole-object reference
1165                          * will always appear first in the sorted list.
1166                          */
1167                         if (priorobj->objectSubId == 0)
1168                         {
1169                                 /* replace whole ref with partial */
1170                                 priorobj->objectSubId = thisobj->objectSubId;
1171                                 continue;
1172                         }
1173                 }
1174                 /* Not identical, so add thisobj to output set */
1175                 priorobj++;
1176                 priorobj->classId = thisobj->classId;
1177                 priorobj->objectId = thisobj->objectId;
1178                 priorobj->objectSubId = thisobj->objectSubId;
1179                 newrefs++;
1180         }
1181
1182         addrs->numrefs = newrefs;
1183 }
1184
1185 /*
1186  * qsort comparator for ObjectAddress items
1187  */
1188 static int
1189 object_address_comparator(const void *a, const void *b)
1190 {
1191         const ObjectAddress *obja = (const ObjectAddress *) a;
1192         const ObjectAddress *objb = (const ObjectAddress *) b;
1193
1194         if (obja->classId < objb->classId)
1195                 return -1;
1196         if (obja->classId > objb->classId)
1197                 return 1;
1198         if (obja->objectId < objb->objectId)
1199                 return -1;
1200         if (obja->objectId > objb->objectId)
1201                 return 1;
1202
1203         /*
1204          * We sort the subId as an unsigned int so that 0 will come first. See
1205          * logic in eliminate_duplicate_dependencies.
1206          */
1207         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
1208                 return -1;
1209         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
1210                 return 1;
1211         return 0;
1212 }
1213
1214 /*
1215  * Routines for handling an expansible array of ObjectAddress items.
1216  *
1217  * init_object_addresses: initialize an ObjectAddresses array.
1218  */
1219 static void
1220 init_object_addresses(ObjectAddresses *addrs)
1221 {
1222         /* Initialize array to empty */
1223         addrs->numrefs = 0;
1224         addrs->maxrefs = 32;            /* arbitrary initial array size */
1225         addrs->refs = (ObjectAddress *)
1226                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
1227
1228         /* Initialize object_classes[] if not done yet */
1229         /* This will be needed by add_object_address() */
1230         if (!object_classes_initialized)
1231                 init_object_classes();
1232 }
1233
1234 /*
1235  * Add an entry to an ObjectAddresses array.
1236  *
1237  * It is convenient to specify the class by ObjectClass rather than directly
1238  * by catalog OID.
1239  */
1240 static void
1241 add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
1242                                    ObjectAddresses *addrs)
1243 {
1244         ObjectAddress *item;
1245
1246         /* enlarge array if needed */
1247         if (addrs->numrefs >= addrs->maxrefs)
1248         {
1249                 addrs->maxrefs *= 2;
1250                 addrs->refs = (ObjectAddress *)
1251                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1252         }
1253         /* record this item */
1254         item = addrs->refs + addrs->numrefs;
1255         item->classId = object_classes[oclass];
1256         item->objectId = objectId;
1257         item->objectSubId = subId;
1258         addrs->numrefs++;
1259 }
1260
1261 /*
1262  * Add an entry to an ObjectAddresses array.
1263  *
1264  * As above, but specify entry exactly.
1265  */
1266 static void
1267 add_exact_object_address(const ObjectAddress *object,
1268                                                  ObjectAddresses *addrs)
1269 {
1270         ObjectAddress *item;
1271
1272         /* enlarge array if needed */
1273         if (addrs->numrefs >= addrs->maxrefs)
1274         {
1275                 addrs->maxrefs *= 2;
1276                 addrs->refs = (ObjectAddress *)
1277                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1278         }
1279         /* record this item */
1280         item = addrs->refs + addrs->numrefs;
1281         *item = *object;
1282         addrs->numrefs++;
1283 }
1284
1285 /*
1286  * Test whether an object is present in an ObjectAddresses array.
1287  *
1288  * We return "true" if object is a subobject of something in the array, too.
1289  */
1290 static bool
1291 object_address_present(const ObjectAddress *object,
1292                                            ObjectAddresses *addrs)
1293 {
1294         int                     i;
1295
1296         for (i = addrs->numrefs - 1; i >= 0; i--)
1297         {
1298                 ObjectAddress *thisobj = addrs->refs + i;
1299
1300                 if (object->classId == thisobj->classId &&
1301                         object->objectId == thisobj->objectId)
1302                 {
1303                         if (object->objectSubId == thisobj->objectSubId ||
1304                                 thisobj->objectSubId == 0)
1305                                 return true;
1306                 }
1307         }
1308
1309         return false;
1310 }
1311
1312 /*
1313  * Clean up when done with an ObjectAddresses array.
1314  */
1315 static void
1316 term_object_addresses(ObjectAddresses *addrs)
1317 {
1318         pfree(addrs->refs);
1319 }
1320
1321 /*
1322  * Initialize the object_classes[] table.
1323  *
1324  * Although some of these OIDs aren't compile-time constants, they surely
1325  * shouldn't change during a backend's run.  So, we look them up the
1326  * first time through and then cache them.
1327  */
1328 static void
1329 init_object_classes(void)
1330 {
1331         object_classes[OCLASS_CLASS] = RelOid_pg_class;
1332         object_classes[OCLASS_PROC] = RelOid_pg_proc;
1333         object_classes[OCLASS_TYPE] = RelOid_pg_type;
1334         object_classes[OCLASS_CAST] = get_system_catalog_relid(CastRelationName);
1335         object_classes[OCLASS_CONSTRAINT] = get_system_catalog_relid(ConstraintRelationName);
1336         object_classes[OCLASS_CONVERSION] = get_system_catalog_relid(ConversionRelationName);
1337         object_classes[OCLASS_DEFAULT] = get_system_catalog_relid(AttrDefaultRelationName);
1338         object_classes[OCLASS_LANGUAGE] = get_system_catalog_relid(LanguageRelationName);
1339         object_classes[OCLASS_OPERATOR] = get_system_catalog_relid(OperatorRelationName);
1340         object_classes[OCLASS_OPCLASS] = get_system_catalog_relid(OperatorClassRelationName);
1341         object_classes[OCLASS_REWRITE] = get_system_catalog_relid(RewriteRelationName);
1342         object_classes[OCLASS_TRIGGER] = get_system_catalog_relid(TriggerRelationName);
1343         object_classes[OCLASS_SCHEMA] = get_system_catalog_relid(NamespaceRelationName);
1344         object_classes_initialized = true;
1345 }
1346
1347 /*
1348  * Determine the class of a given object identified by objectAddress.
1349  *
1350  * This function is needed just because some of the system catalogs do
1351  * not have hardwired-at-compile-time OIDs.
1352  */
1353 static ObjectClasses
1354 getObjectClass(const ObjectAddress *object)
1355 {
1356         /* Easy for the bootstrapped catalogs... */
1357         switch (object->classId)
1358         {
1359                 case RelOid_pg_class:
1360                         /* caller must check objectSubId */
1361                         return OCLASS_CLASS;
1362
1363                 case RelOid_pg_proc:
1364                         Assert(object->objectSubId == 0);
1365                         return OCLASS_PROC;
1366
1367                 case RelOid_pg_type:
1368                         Assert(object->objectSubId == 0);
1369                         return OCLASS_TYPE;
1370         }
1371
1372         /*
1373          * Handle cases where catalog's OID is not hardwired.
1374          */
1375         if (!object_classes_initialized)
1376                 init_object_classes();
1377
1378         if (object->classId == object_classes[OCLASS_CAST])
1379         {
1380                 Assert(object->objectSubId == 0);
1381                 return OCLASS_CAST;
1382         }
1383         if (object->classId == object_classes[OCLASS_CONSTRAINT])
1384         {
1385                 Assert(object->objectSubId == 0);
1386                 return OCLASS_CONSTRAINT;
1387         }
1388         if (object->classId == object_classes[OCLASS_CONVERSION])
1389         {
1390                 Assert(object->objectSubId == 0);
1391                 return OCLASS_CONVERSION;
1392         }
1393         if (object->classId == object_classes[OCLASS_DEFAULT])
1394         {
1395                 Assert(object->objectSubId == 0);
1396                 return OCLASS_DEFAULT;
1397         }
1398         if (object->classId == object_classes[OCLASS_LANGUAGE])
1399         {
1400                 Assert(object->objectSubId == 0);
1401                 return OCLASS_LANGUAGE;
1402         }
1403         if (object->classId == object_classes[OCLASS_OPERATOR])
1404         {
1405                 Assert(object->objectSubId == 0);
1406                 return OCLASS_OPERATOR;
1407         }
1408         if (object->classId == object_classes[OCLASS_OPCLASS])
1409         {
1410                 Assert(object->objectSubId == 0);
1411                 return OCLASS_OPCLASS;
1412         }
1413         if (object->classId == object_classes[OCLASS_REWRITE])
1414         {
1415                 Assert(object->objectSubId == 0);
1416                 return OCLASS_REWRITE;
1417         }
1418         if (object->classId == object_classes[OCLASS_TRIGGER])
1419         {
1420                 Assert(object->objectSubId == 0);
1421                 return OCLASS_TRIGGER;
1422         }
1423         if (object->classId == object_classes[OCLASS_SCHEMA])
1424         {
1425                 Assert(object->objectSubId == 0);
1426                 return OCLASS_SCHEMA;
1427         }
1428
1429         elog(ERROR, "unrecognized object class: %u", object->classId);
1430         return OCLASS_CLASS;            /* keep compiler quiet */
1431 }
1432
1433 /*
1434  * getObjectDescription: build an object description for messages
1435  *
1436  * The result is a palloc'd string.
1437  */
1438 static char *
1439 getObjectDescription(const ObjectAddress *object)
1440 {
1441         StringInfoData buffer;
1442
1443         initStringInfo(&buffer);
1444
1445         switch (getObjectClass(object))
1446         {
1447                 case OCLASS_CLASS:
1448                         getRelationDescription(&buffer, object->objectId);
1449                         if (object->objectSubId != 0)
1450                                 appendStringInfo(&buffer, " column %s",
1451                                                                  get_relid_attribute_name(object->objectId,
1452                                                                                                                   object->objectSubId));
1453                         break;
1454
1455                 case OCLASS_PROC:
1456                         appendStringInfo(&buffer, "function %s",
1457                                                          format_procedure(object->objectId));
1458                         break;
1459
1460                 case OCLASS_TYPE:
1461                         appendStringInfo(&buffer, "type %s",
1462                                                          format_type_be(object->objectId));
1463                         break;
1464
1465                 case OCLASS_CAST:
1466                         {
1467                                 Relation        castDesc;
1468                                 ScanKeyData skey[1];
1469                                 SysScanDesc rcscan;
1470                                 HeapTuple       tup;
1471                                 Form_pg_cast castForm;
1472
1473                                 castDesc = heap_openr(CastRelationName, AccessShareLock);
1474
1475                                 ScanKeyInit(&skey[0],
1476                                                         ObjectIdAttributeNumber,
1477                                                         BTEqualStrategyNumber, F_OIDEQ,
1478                                                         ObjectIdGetDatum(object->objectId));
1479
1480                                 rcscan = systable_beginscan(castDesc, CastOidIndex, true,
1481                                                                                         SnapshotNow, 1, skey);
1482
1483                                 tup = systable_getnext(rcscan);
1484
1485                                 if (!HeapTupleIsValid(tup))
1486                                         elog(ERROR, "could not find tuple for cast %u",
1487                                                  object->objectId);
1488
1489                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
1490
1491                                 appendStringInfo(&buffer, "cast from %s to %s",
1492                                                                  format_type_be(castForm->castsource),
1493                                                                  format_type_be(castForm->casttarget));
1494
1495                                 systable_endscan(rcscan);
1496                                 heap_close(castDesc, AccessShareLock);
1497                                 break;
1498                         }
1499
1500                 case OCLASS_CONSTRAINT:
1501                         {
1502                                 Relation        conDesc;
1503                                 ScanKeyData skey[1];
1504                                 SysScanDesc rcscan;
1505                                 HeapTuple       tup;
1506                                 Form_pg_constraint con;
1507
1508                                 conDesc = heap_openr(ConstraintRelationName, AccessShareLock);
1509
1510                                 ScanKeyInit(&skey[0],
1511                                                         ObjectIdAttributeNumber,
1512                                                         BTEqualStrategyNumber, F_OIDEQ,
1513                                                         ObjectIdGetDatum(object->objectId));
1514
1515                                 rcscan = systable_beginscan(conDesc, ConstraintOidIndex, true,
1516                                                                                         SnapshotNow, 1, skey);
1517
1518                                 tup = systable_getnext(rcscan);
1519
1520                                 if (!HeapTupleIsValid(tup))
1521                                         elog(ERROR, "could not find tuple for constraint %u",
1522                                                  object->objectId);
1523
1524                                 con = (Form_pg_constraint) GETSTRUCT(tup);
1525
1526                                 if (OidIsValid(con->conrelid))
1527                                 {
1528                                         appendStringInfo(&buffer, "constraint %s on ",
1529                                                                          NameStr(con->conname));
1530                                         getRelationDescription(&buffer, con->conrelid);
1531                                 }
1532                                 else
1533                                 {
1534                                         appendStringInfo(&buffer, "constraint %s",
1535                                                                          NameStr(con->conname));
1536                                 }
1537
1538                                 systable_endscan(rcscan);
1539                                 heap_close(conDesc, AccessShareLock);
1540                                 break;
1541                         }
1542
1543                 case OCLASS_CONVERSION:
1544                         {
1545                                 HeapTuple       conTup;
1546
1547                                 conTup = SearchSysCache(CONOID,
1548                                                                           ObjectIdGetDatum(object->objectId),
1549                                                                                 0, 0, 0);
1550                                 if (!HeapTupleIsValid(conTup))
1551                                         elog(ERROR, "cache lookup failed for conversion %u",
1552                                                  object->objectId);
1553                                 appendStringInfo(&buffer, "conversion %s",
1554                                                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
1555                                 ReleaseSysCache(conTup);
1556                                 break;
1557                         }
1558
1559                 case OCLASS_DEFAULT:
1560                         {
1561                                 Relation        attrdefDesc;
1562                                 ScanKeyData skey[1];
1563                                 SysScanDesc adscan;
1564                                 HeapTuple       tup;
1565                                 Form_pg_attrdef attrdef;
1566                                 ObjectAddress colobject;
1567
1568                                 attrdefDesc = heap_openr(AttrDefaultRelationName, AccessShareLock);
1569
1570                                 ScanKeyInit(&skey[0],
1571                                                         ObjectIdAttributeNumber,
1572                                                         BTEqualStrategyNumber, F_OIDEQ,
1573                                                         ObjectIdGetDatum(object->objectId));
1574
1575                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndex,
1576                                                                                         true, SnapshotNow, 1, skey);
1577
1578                                 tup = systable_getnext(adscan);
1579
1580                                 if (!HeapTupleIsValid(tup))
1581                                         elog(ERROR, "could not find tuple for attrdef %u",
1582                                                  object->objectId);
1583
1584                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
1585
1586                                 colobject.classId = RelOid_pg_class;
1587                                 colobject.objectId = attrdef->adrelid;
1588                                 colobject.objectSubId = attrdef->adnum;
1589
1590                                 appendStringInfo(&buffer, "default for %s",
1591                                                                  getObjectDescription(&colobject));
1592
1593                                 systable_endscan(adscan);
1594                                 heap_close(attrdefDesc, AccessShareLock);
1595                                 break;
1596                         }
1597
1598                 case OCLASS_LANGUAGE:
1599                         {
1600                                 HeapTuple       langTup;
1601
1602                                 langTup = SearchSysCache(LANGOID,
1603                                                                           ObjectIdGetDatum(object->objectId),
1604                                                                                  0, 0, 0);
1605                                 if (!HeapTupleIsValid(langTup))
1606                                         elog(ERROR, "cache lookup failed for language %u",
1607                                                  object->objectId);
1608                                 appendStringInfo(&buffer, "language %s",
1609                                                                  NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
1610                                 ReleaseSysCache(langTup);
1611                                 break;
1612                         }
1613
1614                 case OCLASS_OPERATOR:
1615                         appendStringInfo(&buffer, "operator %s",
1616                                                          format_operator(object->objectId));
1617                         break;
1618
1619                 case OCLASS_OPCLASS:
1620                         {
1621                                 HeapTuple       opcTup;
1622                                 Form_pg_opclass opcForm;
1623                                 HeapTuple       amTup;
1624                                 Form_pg_am      amForm;
1625                                 char       *nspname;
1626
1627                                 opcTup = SearchSysCache(CLAOID,
1628                                                                           ObjectIdGetDatum(object->objectId),
1629                                                                                 0, 0, 0);
1630                                 if (!HeapTupleIsValid(opcTup))
1631                                         elog(ERROR, "cache lookup failed for opclass %u",
1632                                                  object->objectId);
1633                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
1634
1635                                 /* Qualify the name if not visible in search path */
1636                                 if (OpclassIsVisible(object->objectId))
1637                                         nspname = NULL;
1638                                 else
1639                                         nspname = get_namespace_name(opcForm->opcnamespace);
1640
1641                                 appendStringInfo(&buffer, "operator class %s",
1642                                                                  quote_qualified_identifier(nspname,
1643                                                                                          NameStr(opcForm->opcname)));
1644
1645                                 amTup = SearchSysCache(AMOID,
1646                                                                            ObjectIdGetDatum(opcForm->opcamid),
1647                                                                            0, 0, 0);
1648                                 if (!HeapTupleIsValid(amTup))
1649                                         elog(ERROR, "cache lookup failed for access method %u",
1650                                                  opcForm->opcamid);
1651                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
1652
1653                                 appendStringInfo(&buffer, " for %s",
1654                                                                  NameStr(amForm->amname));
1655
1656                                 ReleaseSysCache(amTup);
1657                                 ReleaseSysCache(opcTup);
1658                                 break;
1659                         }
1660
1661                 case OCLASS_REWRITE:
1662                         {
1663                                 Relation        ruleDesc;
1664                                 ScanKeyData skey[1];
1665                                 SysScanDesc rcscan;
1666                                 HeapTuple       tup;
1667                                 Form_pg_rewrite rule;
1668
1669                                 ruleDesc = heap_openr(RewriteRelationName, AccessShareLock);
1670
1671                                 ScanKeyInit(&skey[0],
1672                                                         ObjectIdAttributeNumber,
1673                                                         BTEqualStrategyNumber, F_OIDEQ,
1674                                                         ObjectIdGetDatum(object->objectId));
1675
1676                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndex, true,
1677                                                                                         SnapshotNow, 1, skey);
1678
1679                                 tup = systable_getnext(rcscan);
1680
1681                                 if (!HeapTupleIsValid(tup))
1682                                         elog(ERROR, "could not find tuple for rule %u",
1683                                                  object->objectId);
1684
1685                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
1686
1687                                 appendStringInfo(&buffer, "rule %s on ",
1688                                                                  NameStr(rule->rulename));
1689                                 getRelationDescription(&buffer, rule->ev_class);
1690
1691                                 systable_endscan(rcscan);
1692                                 heap_close(ruleDesc, AccessShareLock);
1693                                 break;
1694                         }
1695
1696                 case OCLASS_TRIGGER:
1697                         {
1698                                 Relation        trigDesc;
1699                                 ScanKeyData skey[1];
1700                                 SysScanDesc tgscan;
1701                                 HeapTuple       tup;
1702                                 Form_pg_trigger trig;
1703
1704                                 trigDesc = heap_openr(TriggerRelationName, AccessShareLock);
1705
1706                                 ScanKeyInit(&skey[0],
1707                                                         ObjectIdAttributeNumber,
1708                                                         BTEqualStrategyNumber, F_OIDEQ,
1709                                                         ObjectIdGetDatum(object->objectId));
1710
1711                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndex, true,
1712                                                                                         SnapshotNow, 1, skey);
1713
1714                                 tup = systable_getnext(tgscan);
1715
1716                                 if (!HeapTupleIsValid(tup))
1717                                         elog(ERROR, "could not find tuple for trigger %u",
1718                                                  object->objectId);
1719
1720                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
1721
1722                                 appendStringInfo(&buffer, "trigger %s on ",
1723                                                                  NameStr(trig->tgname));
1724                                 getRelationDescription(&buffer, trig->tgrelid);
1725
1726                                 systable_endscan(tgscan);
1727                                 heap_close(trigDesc, AccessShareLock);
1728                                 break;
1729                         }
1730
1731                 case OCLASS_SCHEMA:
1732                         {
1733                                 char       *nspname;
1734
1735                                 nspname = get_namespace_name(object->objectId);
1736                                 if (!nspname)
1737                                         elog(ERROR, "cache lookup failed for namespace %u",
1738                                                  object->objectId);
1739                                 appendStringInfo(&buffer, "schema %s", nspname);
1740                                 break;
1741                         }
1742
1743                 default:
1744                         appendStringInfo(&buffer, "unrecognized object %u %u %d",
1745                                                          object->classId,
1746                                                          object->objectId,
1747                                                          object->objectSubId);
1748                         break;
1749         }
1750
1751         return buffer.data;
1752 }
1753
1754 /*
1755  * subroutine for getObjectDescription: describe a relation
1756  */
1757 static void
1758 getRelationDescription(StringInfo buffer, Oid relid)
1759 {
1760         HeapTuple       relTup;
1761         Form_pg_class relForm;
1762         char       *nspname;
1763         char       *relname;
1764
1765         relTup = SearchSysCache(RELOID,
1766                                                         ObjectIdGetDatum(relid),
1767                                                         0, 0, 0);
1768         if (!HeapTupleIsValid(relTup))
1769                 elog(ERROR, "cache lookup failed for relation %u", relid);
1770         relForm = (Form_pg_class) GETSTRUCT(relTup);
1771
1772         /* Qualify the name if not visible in search path */
1773         if (RelationIsVisible(relid))
1774                 nspname = NULL;
1775         else
1776                 nspname = get_namespace_name(relForm->relnamespace);
1777
1778         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
1779
1780         switch (relForm->relkind)
1781         {
1782                 case RELKIND_RELATION:
1783                         appendStringInfo(buffer, "table %s",
1784                                                          relname);
1785                         break;
1786                 case RELKIND_INDEX:
1787                         appendStringInfo(buffer, "index %s",
1788                                                          relname);
1789                         break;
1790                 case RELKIND_SPECIAL:
1791                         appendStringInfo(buffer, "special system relation %s",
1792                                                          relname);
1793                         break;
1794                 case RELKIND_SEQUENCE:
1795                         appendStringInfo(buffer, "sequence %s",
1796                                                          relname);
1797                         break;
1798                 case RELKIND_UNCATALOGED:
1799                         appendStringInfo(buffer, "uncataloged table %s",
1800                                                          relname);
1801                         break;
1802                 case RELKIND_TOASTVALUE:
1803                         appendStringInfo(buffer, "toast table %s",
1804                                                          relname);
1805                         break;
1806                 case RELKIND_VIEW:
1807                         appendStringInfo(buffer, "view %s",
1808                                                          relname);
1809                         break;
1810                 case RELKIND_COMPOSITE_TYPE:
1811                         appendStringInfo(buffer, "composite type %s",
1812                                                          relname);
1813                         break;
1814                 default:
1815                         /* shouldn't get here */
1816                         appendStringInfo(buffer, "relation %s",
1817                                                          relname);
1818                         break;
1819         }
1820
1821         ReleaseSysCache(relTup);
1822 }