]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
4090eb3a911703021a00a8e38b4aa564415ebdd7
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.43 2005/04/14 01:38:15 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_attrdef.h"
26 #include "catalog/pg_cast.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_conversion.h"
29 #include "catalog/pg_depend.h"
30 #include "catalog/pg_language.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_proc.h"
33 #include "catalog/pg_rewrite.h"
34 #include "catalog/pg_trigger.h"
35 #include "catalog/pg_type.h"
36 #include "commands/comment.h"
37 #include "commands/defrem.h"
38 #include "commands/proclang.h"
39 #include "commands/schemacmds.h"
40 #include "commands/trigger.h"
41 #include "commands/typecmds.h"
42 #include "lib/stringinfo.h"
43 #include "miscadmin.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parsetree.h"
46 #include "rewrite/rewriteRemove.h"
47 #include "utils/builtins.h"
48 #include "utils/fmgroids.h"
49 #include "utils/lsyscache.h"
50 #include "utils/syscache.h"
51
52
53 /* expansible list of ObjectAddresses */
54 typedef struct
55 {
56         ObjectAddress *refs;            /* => palloc'd array */
57         int                     numrefs;                /* current number of references */
58         int                     maxrefs;                /* current size of palloc'd array */
59 } ObjectAddresses;
60
61 /* for find_expr_references_walker */
62 typedef struct
63 {
64         ObjectAddresses addrs;          /* addresses being accumulated */
65         List       *rtables;            /* list of rangetables to resolve Vars */
66 } find_expr_references_context;
67
68
69 /*
70  * Because not all system catalogs have predetermined OIDs, we build a table
71  * mapping between ObjectClasses and OIDs.      This is done at most once per
72  * backend run, to minimize lookup overhead.
73  */
74 static bool object_classes_initialized = false;
75 static Oid      object_classes[MAX_OCLASS];
76
77
78 static void findAutoDeletableObjects(const ObjectAddress *object,
79                                                  ObjectAddresses *oktodelete,
80                                                  Relation depRel);
81 static bool recursiveDeletion(const ObjectAddress *object,
82                                   DropBehavior behavior,
83                                   int msglevel,
84                                   const ObjectAddress *callingObject,
85                                   ObjectAddresses *oktodelete,
86                                   Relation depRel);
87 static bool deleteDependentObjects(const ObjectAddress *object,
88                                            const char *objDescription,
89                                            DropBehavior behavior,
90                                            int msglevel,
91                                            ObjectAddresses *oktodelete,
92                                            Relation depRel);
93 static void doDeletion(const ObjectAddress *object);
94 static bool find_expr_references_walker(Node *node,
95                                                         find_expr_references_context *context);
96 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
97 static int      object_address_comparator(const void *a, const void *b);
98 static void init_object_addresses(ObjectAddresses *addrs);
99 static void add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
100                                    ObjectAddresses *addrs);
101 static void add_exact_object_address(const ObjectAddress *object,
102                                                  ObjectAddresses *addrs);
103 static bool object_address_present(const ObjectAddress *object,
104                                            ObjectAddresses *addrs);
105 static void term_object_addresses(ObjectAddresses *addrs);
106 static void init_object_classes(void);
107 static void getRelationDescription(StringInfo buffer, Oid relid);
108
109
110 /*
111  * performDeletion: attempt to drop the specified object.  If CASCADE
112  * behavior is specified, also drop any dependent objects (recursively).
113  * If RESTRICT behavior is specified, error out if there are any dependent
114  * objects, except for those that should be implicitly dropped anyway
115  * according to the dependency type.
116  *
117  * This is the outer control routine for all forms of DROP that drop objects
118  * that can participate in dependencies.
119  */
120 void
121 performDeletion(const ObjectAddress *object,
122                                 DropBehavior behavior)
123 {
124         char       *objDescription;
125         Relation        depRel;
126         ObjectAddresses oktodelete;
127
128         /*
129          * Get object description for possible use in failure message. Must do
130          * this before deleting it ...
131          */
132         objDescription = getObjectDescription(object);
133
134         /*
135          * We save some cycles by opening pg_depend just once and passing the
136          * Relation pointer down to all the recursive deletion steps.
137          */
138         depRel = heap_openr(DependRelationName, RowExclusiveLock);
139
140         /*
141          * Construct a list of objects that are reachable by AUTO or INTERNAL
142          * dependencies from the target object.  These should be deleted
143          * silently, even if the actual deletion pass first reaches one of
144          * them via a non-auto dependency.
145          */
146         init_object_addresses(&oktodelete);
147
148         findAutoDeletableObjects(object, &oktodelete, depRel);
149
150         if (!recursiveDeletion(object, behavior, NOTICE,
151                                                    NULL, &oktodelete, depRel))
152                 ereport(ERROR,
153                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
154                           errmsg("cannot drop %s because other objects depend on it",
155                                          objDescription),
156                                  errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
157
158         term_object_addresses(&oktodelete);
159
160         heap_close(depRel, RowExclusiveLock);
161
162         pfree(objDescription);
163 }
164
165
166 /*
167  * deleteWhatDependsOn: attempt to drop everything that depends on the
168  * specified object, though not the object itself.      Behavior is always
169  * CASCADE.
170  *
171  * This is currently used only to clean out the contents of a schema
172  * (namespace): the passed object is a namespace.  We normally want this
173  * to be done silently, so there's an option to suppress NOTICE messages.
174  */
175 void
176 deleteWhatDependsOn(const ObjectAddress *object,
177                                         bool showNotices)
178 {
179         char       *objDescription;
180         Relation        depRel;
181         ObjectAddresses oktodelete;
182
183         /*
184          * Get object description for possible use in failure messages
185          */
186         objDescription = getObjectDescription(object);
187
188         /*
189          * We save some cycles by opening pg_depend just once and passing the
190          * Relation pointer down to all the recursive deletion steps.
191          */
192         depRel = heap_openr(DependRelationName, RowExclusiveLock);
193
194         /*
195          * Construct a list of objects that are reachable by AUTO or INTERNAL
196          * dependencies from the target object.  These should be deleted
197          * silently, even if the actual deletion pass first reaches one of
198          * them via a non-auto dependency.
199          */
200         init_object_addresses(&oktodelete);
201
202         findAutoDeletableObjects(object, &oktodelete, depRel);
203
204         /*
205          * Now invoke only step 2 of recursiveDeletion: just recurse to the
206          * stuff dependent on the given object.
207          */
208         if (!deleteDependentObjects(object, objDescription,
209                                                                 DROP_CASCADE,
210                                                                 showNotices ? NOTICE : DEBUG2,
211                                                                 &oktodelete, depRel))
212                 ereport(ERROR,
213                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
214                                  errmsg("failed to drop all objects depending on %s",
215                                                 objDescription)));
216
217         /*
218          * We do not need CommandCounterIncrement here, since if step 2 did
219          * anything then each recursive call will have ended with one.
220          */
221
222         term_object_addresses(&oktodelete);
223
224         heap_close(depRel, RowExclusiveLock);
225
226         pfree(objDescription);
227 }
228
229
230 /*
231  * findAutoDeletableObjects: find all objects that are reachable by AUTO or
232  * INTERNAL dependency paths from the given object.  Add them all to the
233  * oktodelete list.  Note that the originally given object will also be
234  * added to the list.
235  *
236  * depRel is the already-open pg_depend relation.
237  */
238 static void
239 findAutoDeletableObjects(const ObjectAddress *object,
240                                                  ObjectAddresses *oktodelete,
241                                                  Relation depRel)
242 {
243         ScanKeyData key[3];
244         int                     nkeys;
245         SysScanDesc scan;
246         HeapTuple       tup;
247         ObjectAddress otherObject;
248
249         /*
250          * If this object is already in oktodelete, then we already visited
251          * it; don't do so again (this prevents infinite recursion if there's
252          * a loop in pg_depend).  Otherwise, add it.
253          */
254         if (object_address_present(object, oktodelete))
255                 return;
256         add_exact_object_address(object, oktodelete);
257
258         /*
259          * Scan pg_depend records that link to this object, showing the things
260          * that depend on it.  For each one that is AUTO or INTERNAL, visit
261          * the referencing object.
262          *
263          * When dropping a whole object (subId = 0), find pg_depend records for
264          * its sub-objects too.
265          */
266         ScanKeyInit(&key[0],
267                                 Anum_pg_depend_refclassid,
268                                 BTEqualStrategyNumber, F_OIDEQ,
269                                 ObjectIdGetDatum(object->classId));
270         ScanKeyInit(&key[1],
271                                 Anum_pg_depend_refobjid,
272                                 BTEqualStrategyNumber, F_OIDEQ,
273                                 ObjectIdGetDatum(object->objectId));
274         if (object->objectSubId != 0)
275         {
276                 ScanKeyInit(&key[2],
277                                         Anum_pg_depend_refobjsubid,
278                                         BTEqualStrategyNumber, F_INT4EQ,
279                                         Int32GetDatum(object->objectSubId));
280                 nkeys = 3;
281         }
282         else
283                 nkeys = 2;
284
285         scan = systable_beginscan(depRel, DependReferenceIndex, true,
286                                                           SnapshotNow, nkeys, key);
287
288         while (HeapTupleIsValid(tup = systable_getnext(scan)))
289         {
290                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
291
292                 switch (foundDep->deptype)
293                 {
294                         case DEPENDENCY_NORMAL:
295                                 /* ignore */
296                                 break;
297                         case DEPENDENCY_AUTO:
298                         case DEPENDENCY_INTERNAL:
299                                 /* recurse */
300                                 otherObject.classId = foundDep->classid;
301                                 otherObject.objectId = foundDep->objid;
302                                 otherObject.objectSubId = foundDep->objsubid;
303                                 findAutoDeletableObjects(&otherObject, oktodelete, depRel);
304                                 break;
305                         case DEPENDENCY_PIN:
306
307                                 /*
308                                  * For a PIN dependency we just ereport immediately; there
309                                  * won't be any others to examine, and we aren't ever
310                                  * going to let the user delete it.
311                                  */
312                                 ereport(ERROR,
313                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
314                                                  errmsg("cannot drop %s because it is required by the database system",
315                                                                 getObjectDescription(object))));
316                                 break;
317                         default:
318                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
319                                          foundDep->deptype, getObjectDescription(object));
320                                 break;
321                 }
322         }
323
324         systable_endscan(scan);
325 }
326
327
328 /*
329  * recursiveDeletion: delete a single object for performDeletion, plus
330  * (recursively) anything that depends on it.
331  *
332  * Returns TRUE if successful, FALSE if not.
333  *
334  * callingObject is NULL at the outer level, else identifies the object that
335  * we recursed from (the reference object that someone else needs to delete).
336  *
337  * oktodelete is a list of objects verified deletable (ie, reachable by one
338  * or more AUTO or INTERNAL dependencies from the original target).
339  *
340  * depRel is the already-open pg_depend relation.
341  *
342  *
343  * In RESTRICT mode, we perform all the deletions anyway, but ereport a message
344  * and return FALSE if we find a restriction violation.  performDeletion
345  * will then abort the transaction to nullify the deletions.  We have to
346  * do it this way to (a) report all the direct and indirect dependencies
347  * while (b) not going into infinite recursion if there's a cycle.
348  *
349  * This is even more complex than one could wish, because it is possible for
350  * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
351  * dependencies.  Also, we might have a situation where we've been asked to
352  * delete object A, and objects B and C both have AUTO dependencies on A,
353  * but B also has a NORMAL dependency on C.  (Since any of these paths might
354  * be indirect, we can't prevent these scenarios, but must cope instead.)
355  * If we visit C before B then we would mistakenly decide that the B->C link
356  * should prevent the restricted drop from occurring.  To handle this, we make
357  * a pre-scan to find all the objects that are auto-deletable from A.  If we
358  * visit C first, but B is present in the oktodelete list, then we make no
359  * complaint but recurse to delete B anyway.  (Note that in general we must
360  * delete B before deleting C; the drop routine for B may try to access C.)
361  *
362  * Note: in the case where the path to B is traversed first, we will not
363  * see the NORMAL dependency when we reach C, because of the pg_depend
364  * removals done in step 1.  The oktodelete list is necessary just
365  * to make the behavior independent of the order in which pg_depend
366  * entries are visited.
367  */
368 static bool
369 recursiveDeletion(const ObjectAddress *object,
370                                   DropBehavior behavior,
371                                   int msglevel,
372                                   const ObjectAddress *callingObject,
373                                   ObjectAddresses *oktodelete,
374                                   Relation depRel)
375 {
376         bool            ok = true;
377         char       *objDescription;
378         ScanKeyData key[3];
379         int                     nkeys;
380         SysScanDesc scan;
381         HeapTuple       tup;
382         ObjectAddress otherObject;
383         ObjectAddress owningObject;
384         bool            amOwned = false;
385
386         /*
387          * Get object description for possible use in messages.  Must do this
388          * before deleting it ...
389          */
390         objDescription = getObjectDescription(object);
391
392         /*
393          * Step 1: find and remove pg_depend records that link from this
394          * object to others.  We have to do this anyway, and doing it first
395          * ensures that we avoid infinite recursion in the case of cycles.
396          * Also, some dependency types require extra processing here.
397          *
398          * When dropping a whole object (subId = 0), remove all pg_depend records
399          * for its sub-objects too.
400          */
401         ScanKeyInit(&key[0],
402                                 Anum_pg_depend_classid,
403                                 BTEqualStrategyNumber, F_OIDEQ,
404                                 ObjectIdGetDatum(object->classId));
405         ScanKeyInit(&key[1],
406                                 Anum_pg_depend_objid,
407                                 BTEqualStrategyNumber, F_OIDEQ,
408                                 ObjectIdGetDatum(object->objectId));
409         if (object->objectSubId != 0)
410         {
411                 ScanKeyInit(&key[2],
412                                         Anum_pg_depend_objsubid,
413                                         BTEqualStrategyNumber, F_INT4EQ,
414                                         Int32GetDatum(object->objectSubId));
415                 nkeys = 3;
416         }
417         else
418                 nkeys = 2;
419
420         scan = systable_beginscan(depRel, DependDependerIndex, true,
421                                                           SnapshotNow, nkeys, key);
422
423         while (HeapTupleIsValid(tup = systable_getnext(scan)))
424         {
425                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
426
427                 otherObject.classId = foundDep->refclassid;
428                 otherObject.objectId = foundDep->refobjid;
429                 otherObject.objectSubId = foundDep->refobjsubid;
430
431                 switch (foundDep->deptype)
432                 {
433                         case DEPENDENCY_NORMAL:
434                         case DEPENDENCY_AUTO:
435                                 /* no problem */
436                                 break;
437                         case DEPENDENCY_INTERNAL:
438
439                                 /*
440                                  * This object is part of the internal implementation of
441                                  * another object.      We have three cases:
442                                  *
443                                  * 1. At the outermost recursion level, disallow the DROP.
444                                  * (We just ereport here, rather than proceeding, since no
445                                  * other dependencies are likely to be interesting.)
446                                  */
447                                 if (callingObject == NULL)
448                                 {
449                                         char       *otherObjDesc = getObjectDescription(&otherObject);
450
451                                         ereport(ERROR,
452                                                  (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
453                                                   errmsg("cannot drop %s because %s requires it",
454                                                                  objDescription, otherObjDesc),
455                                                   errhint("You may drop %s instead.",
456                                                                   otherObjDesc)));
457                                 }
458
459                                 /*
460                                  * 2. When recursing from the other end of this
461                                  * dependency, it's okay to continue with the deletion.
462                                  * This holds when recursing from a whole object that
463                                  * includes the nominal other end as a component, too.
464                                  */
465                                 if (callingObject->classId == otherObject.classId &&
466                                         callingObject->objectId == otherObject.objectId &&
467                                 (callingObject->objectSubId == otherObject.objectSubId ||
468                                  callingObject->objectSubId == 0))
469                                         break;
470
471                                 /*
472                                  * 3. When recursing from anyplace else, transform this
473                                  * deletion request into a delete of the other object.
474                                  * (This will be an error condition iff RESTRICT mode.) In
475                                  * this case we finish deleting my dependencies except for
476                                  * the INTERNAL link, which will be needed to cause the
477                                  * owning object to recurse back to me.
478                                  */
479                                 if (amOwned)    /* shouldn't happen */
480                                         elog(ERROR, "multiple INTERNAL dependencies for %s",
481                                                  objDescription);
482                                 owningObject = otherObject;
483                                 amOwned = true;
484                                 /* "continue" bypasses the simple_heap_delete call below */
485                                 continue;
486                         case DEPENDENCY_PIN:
487
488                                 /*
489                                  * Should not happen; PIN dependencies should have zeroes
490                                  * in the depender fields...
491                                  */
492                                 elog(ERROR, "incorrect use of PIN dependency with %s",
493                                          objDescription);
494                                 break;
495                         default:
496                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
497                                          foundDep->deptype, objDescription);
498                                 break;
499                 }
500
501                 simple_heap_delete(depRel, &tup->t_self);
502         }
503
504         systable_endscan(scan);
505
506         /*
507          * CommandCounterIncrement here to ensure that preceding changes are
508          * all visible; in particular, that the above deletions of pg_depend
509          * entries are visible.  That prevents infinite recursion in case of a
510          * dependency loop (which is perfectly legal).
511          */
512         CommandCounterIncrement();
513
514         /*
515          * If we found we are owned by another object, ask it to delete itself
516          * instead of proceeding.  Complain if RESTRICT mode, unless the other
517          * object is in oktodelete.
518          */
519         if (amOwned)
520         {
521                 if (object_address_present(&owningObject, oktodelete))
522                         ereport(DEBUG2,
523                                         (errmsg("drop auto-cascades to %s",
524                                                         getObjectDescription(&owningObject))));
525                 else if (behavior == DROP_RESTRICT)
526                 {
527                         ereport(msglevel,
528                                         (errmsg("%s depends on %s",
529                                                         getObjectDescription(&owningObject),
530                                                         objDescription)));
531                         ok = false;
532                 }
533                 else
534                         ereport(msglevel,
535                                         (errmsg("drop cascades to %s",
536                                                         getObjectDescription(&owningObject))));
537
538                 if (!recursiveDeletion(&owningObject, behavior, msglevel,
539                                                            object, oktodelete, depRel))
540                         ok = false;
541
542                 pfree(objDescription);
543
544                 return ok;
545         }
546
547         /*
548          * Step 2: scan pg_depend records that link to this object, showing
549          * the things that depend on it.  Recursively delete those things.
550          * Note it's important to delete the dependent objects before the
551          * referenced one, since the deletion routines might do things like
552          * try to update the pg_class record when deleting a check constraint.
553          */
554         if (!deleteDependentObjects(object, objDescription,
555                                                                 behavior, msglevel,
556                                                                 oktodelete, depRel))
557                 ok = false;
558
559         /*
560          * We do not need CommandCounterIncrement here, since if step 2 did
561          * anything then each recursive call will have ended with one.
562          */
563
564         /*
565          * Step 3: delete the object itself.
566          */
567         doDeletion(object);
568
569         /*
570          * Delete any comments associated with this object.  (This is a
571          * convenient place to do it instead of having every object type know
572          * to do it.)
573          */
574         DeleteComments(object->objectId, object->classId, object->objectSubId);
575
576         /*
577          * CommandCounterIncrement here to ensure that preceding changes are
578          * all visible.
579          */
580         CommandCounterIncrement();
581
582         /*
583          * And we're done!
584          */
585         pfree(objDescription);
586
587         return ok;
588 }
589
590
591 /*
592  * deleteDependentObjects - find and delete objects that depend on 'object'
593  *
594  * Scan pg_depend records that link to the given object, showing
595  * the things that depend on it.  Recursively delete those things. (We
596  * don't delete the pg_depend records here, as the recursive call will
597  * do that.)  Note it's important to delete the dependent objects
598  * before the referenced one, since the deletion routines might do
599  * things like try to update the pg_class record when deleting a check
600  * constraint.
601  *
602  * When dropping a whole object (subId = 0), find pg_depend records for
603  * its sub-objects too.
604  *
605  *      object: the object to find dependencies on
606  *      objDescription: description of object (only used for error messages)
607  *      behavior: desired drop behavior
608  *      oktodelete: stuff that's AUTO-deletable
609  *      depRel: already opened pg_depend relation
610  *
611  * Returns TRUE if all is well, false if any problem found.
612  *
613  * NOTE: because we are using SnapshotNow, if a recursive call deletes
614  * any pg_depend tuples that our scan hasn't yet visited, we will not
615  * see them as good when we do visit them.      This is essential for
616  * correct behavior if there are multiple dependency paths between two
617  * objects --- else we might try to delete an already-deleted object.
618  */
619 static bool
620 deleteDependentObjects(const ObjectAddress *object,
621                                            const char *objDescription,
622                                            DropBehavior behavior,
623                                            int msglevel,
624                                            ObjectAddresses *oktodelete,
625                                            Relation depRel)
626 {
627         bool            ok = true;
628         ScanKeyData key[3];
629         int                     nkeys;
630         SysScanDesc scan;
631         HeapTuple       tup;
632         ObjectAddress otherObject;
633
634         ScanKeyInit(&key[0],
635                                 Anum_pg_depend_refclassid,
636                                 BTEqualStrategyNumber, F_OIDEQ,
637                                 ObjectIdGetDatum(object->classId));
638         ScanKeyInit(&key[1],
639                                 Anum_pg_depend_refobjid,
640                                 BTEqualStrategyNumber, F_OIDEQ,
641                                 ObjectIdGetDatum(object->objectId));
642         if (object->objectSubId != 0)
643         {
644                 ScanKeyInit(&key[2],
645                                         Anum_pg_depend_refobjsubid,
646                                         BTEqualStrategyNumber, F_INT4EQ,
647                                         Int32GetDatum(object->objectSubId));
648                 nkeys = 3;
649         }
650         else
651                 nkeys = 2;
652
653         scan = systable_beginscan(depRel, DependReferenceIndex, true,
654                                                           SnapshotNow, nkeys, key);
655
656         while (HeapTupleIsValid(tup = systable_getnext(scan)))
657         {
658                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
659
660                 otherObject.classId = foundDep->classid;
661                 otherObject.objectId = foundDep->objid;
662                 otherObject.objectSubId = foundDep->objsubid;
663
664                 switch (foundDep->deptype)
665                 {
666                         case DEPENDENCY_NORMAL:
667
668                                 /*
669                                  * Perhaps there was another dependency path that would
670                                  * have allowed silent deletion of the otherObject, had we
671                                  * only taken that path first. In that case, act like this
672                                  * link is AUTO, too.
673                                  */
674                                 if (object_address_present(&otherObject, oktodelete))
675                                         ereport(DEBUG2,
676                                                         (errmsg("drop auto-cascades to %s",
677                                                                         getObjectDescription(&otherObject))));
678                                 else if (behavior == DROP_RESTRICT)
679                                 {
680                                         ereport(msglevel,
681                                                         (errmsg("%s depends on %s",
682                                                                         getObjectDescription(&otherObject),
683                                                                         objDescription)));
684                                         ok = false;
685                                 }
686                                 else
687                                         ereport(msglevel,
688                                                         (errmsg("drop cascades to %s",
689                                                                         getObjectDescription(&otherObject))));
690
691                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
692                                                                            object, oktodelete, depRel))
693                                         ok = false;
694                                 break;
695                         case DEPENDENCY_AUTO:
696                         case DEPENDENCY_INTERNAL:
697
698                                 /*
699                                  * We propagate the DROP without complaint even in the
700                                  * RESTRICT case.  (However, normal dependencies on the
701                                  * component object could still cause failure.)
702                                  */
703                                 ereport(DEBUG2,
704                                                 (errmsg("drop auto-cascades to %s",
705                                                                 getObjectDescription(&otherObject))));
706
707                                 if (!recursiveDeletion(&otherObject, behavior, msglevel,
708                                                                            object, oktodelete, depRel))
709                                         ok = false;
710                                 break;
711                         case DEPENDENCY_PIN:
712
713                                 /*
714                                  * For a PIN dependency we just ereport immediately; there
715                                  * won't be any others to report.
716                                  */
717                                 ereport(ERROR,
718                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
719                                                  errmsg("cannot drop %s because it is required by the database system",
720                                                                 objDescription)));
721                                 break;
722                         default:
723                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
724                                          foundDep->deptype, objDescription);
725                                 break;
726                 }
727         }
728
729         systable_endscan(scan);
730
731         return ok;
732 }
733
734
735 /*
736  * doDeletion: actually delete a single object
737  */
738 static void
739 doDeletion(const ObjectAddress *object)
740 {
741         switch (getObjectClass(object))
742         {
743                 case OCLASS_CLASS:
744                         {
745                                 char            relKind = get_rel_relkind(object->objectId);
746
747                                 if (relKind == RELKIND_INDEX)
748                                 {
749                                         Assert(object->objectSubId == 0);
750                                         index_drop(object->objectId);
751                                 }
752                                 else
753                                 {
754                                         if (object->objectSubId != 0)
755                                                 RemoveAttributeById(object->objectId,
756                                                                                         object->objectSubId);
757                                         else
758                                                 heap_drop_with_catalog(object->objectId);
759                                 }
760                                 break;
761                         }
762
763                 case OCLASS_PROC:
764                         RemoveFunctionById(object->objectId);
765                         break;
766
767                 case OCLASS_TYPE:
768                         RemoveTypeById(object->objectId);
769                         break;
770
771                 case OCLASS_CAST:
772                         DropCastById(object->objectId);
773                         break;
774
775                 case OCLASS_CONSTRAINT:
776                         RemoveConstraintById(object->objectId);
777                         break;
778
779                 case OCLASS_CONVERSION:
780                         RemoveConversionById(object->objectId);
781                         break;
782
783                 case OCLASS_DEFAULT:
784                         RemoveAttrDefaultById(object->objectId);
785                         break;
786
787                 case OCLASS_LANGUAGE:
788                         DropProceduralLanguageById(object->objectId);
789                         break;
790
791                 case OCLASS_OPERATOR:
792                         RemoveOperatorById(object->objectId);
793                         break;
794
795                 case OCLASS_OPCLASS:
796                         RemoveOpClassById(object->objectId);
797                         break;
798
799                 case OCLASS_REWRITE:
800                         RemoveRewriteRuleById(object->objectId);
801                         break;
802
803                 case OCLASS_TRIGGER:
804                         RemoveTriggerById(object->objectId);
805                         break;
806
807                 case OCLASS_SCHEMA:
808                         RemoveSchemaById(object->objectId);
809                         break;
810
811                 default:
812                         elog(ERROR, "unrecognized object class: %u",
813                                  object->classId);
814         }
815 }
816
817 /*
818  * recordDependencyOnExpr - find expression dependencies
819  *
820  * This is used to find the dependencies of rules, constraint expressions,
821  * etc.
822  *
823  * Given an expression or query in node-tree form, find all the objects
824  * it refers to (tables, columns, operators, functions, etc).  Record
825  * a dependency of the specified type from the given depender object
826  * to each object mentioned in the expression.
827  *
828  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
829  * It can be NIL if no such variables are expected.
830  *
831  * XXX is it important to create dependencies on the datatypes mentioned in
832  * the expression?      In most cases this would be redundant (eg, a ref to an
833  * operator indirectly references its input and output datatypes), but I'm
834  * not quite convinced there are no cases where we need it.
835  */
836 void
837 recordDependencyOnExpr(const ObjectAddress *depender,
838                                            Node *expr, List *rtable,
839                                            DependencyType behavior)
840 {
841         find_expr_references_context context;
842
843         init_object_addresses(&context.addrs);
844
845         /* Set up interpretation for Vars at varlevelsup = 0 */
846         context.rtables = list_make1(rtable);
847
848         /* Scan the expression tree for referenceable objects */
849         find_expr_references_walker(expr, &context);
850
851         /* Remove any duplicates */
852         eliminate_duplicate_dependencies(&context.addrs);
853
854         /* And record 'em */
855         recordMultipleDependencies(depender,
856                                                            context.addrs.refs, context.addrs.numrefs,
857                                                            behavior);
858
859         term_object_addresses(&context.addrs);
860 }
861
862 /*
863  * recordDependencyOnSingleRelExpr - find expression dependencies
864  *
865  * As above, but only one relation is expected to be referenced (with
866  * varno = 1 and varlevelsup = 0).      Pass the relation OID instead of a
867  * range table.  An additional frammish is that dependencies on that
868  * relation (or its component columns) will be marked with 'self_behavior',
869  * whereas 'behavior' is used for everything else.
870  */
871 void
872 recordDependencyOnSingleRelExpr(const ObjectAddress *depender,
873                                                                 Node *expr, Oid relId,
874                                                                 DependencyType behavior,
875                                                                 DependencyType self_behavior)
876 {
877         find_expr_references_context context;
878         RangeTblEntry rte;
879
880         init_object_addresses(&context.addrs);
881
882         /* We gin up a rather bogus rangetable list to handle Vars */
883         MemSet(&rte, 0, sizeof(rte));
884         rte.type = T_RangeTblEntry;
885         rte.rtekind = RTE_RELATION;
886         rte.relid = relId;
887
888         context.rtables = list_make1(list_make1(&rte));
889
890         /* Scan the expression tree for referenceable objects */
891         find_expr_references_walker(expr, &context);
892
893         /* Remove any duplicates */
894         eliminate_duplicate_dependencies(&context.addrs);
895
896         /* Separate self-dependencies if necessary */
897         if (behavior != self_behavior && context.addrs.numrefs > 0)
898         {
899                 ObjectAddresses self_addrs;
900                 ObjectAddress *outobj;
901                 int                     oldref,
902                                         outrefs;
903
904                 init_object_addresses(&self_addrs);
905
906                 outobj = context.addrs.refs;
907                 outrefs = 0;
908                 for (oldref = 0; oldref < context.addrs.numrefs; oldref++)
909                 {
910                         ObjectAddress *thisobj = context.addrs.refs + oldref;
911
912                         if (thisobj->classId == RelationRelationId &&
913                                 thisobj->objectId == relId)
914                         {
915                                 /* Move this ref into self_addrs */
916                                 add_object_address(OCLASS_CLASS, relId, thisobj->objectSubId,
917                                                                    &self_addrs);
918                         }
919                         else
920                         {
921                                 /* Keep it in context.addrs */
922                                 outobj->classId = thisobj->classId;
923                                 outobj->objectId = thisobj->objectId;
924                                 outobj->objectSubId = thisobj->objectSubId;
925                                 outobj++;
926                                 outrefs++;
927                         }
928                 }
929                 context.addrs.numrefs = outrefs;
930
931                 /* Record the self-dependencies */
932                 recordMultipleDependencies(depender,
933                                                                    self_addrs.refs, self_addrs.numrefs,
934                                                                    self_behavior);
935
936                 term_object_addresses(&self_addrs);
937         }
938
939         /* Record the external dependencies */
940         recordMultipleDependencies(depender,
941                                                            context.addrs.refs, context.addrs.numrefs,
942                                                            behavior);
943
944         term_object_addresses(&context.addrs);
945 }
946
947 /*
948  * Recursively search an expression tree for object references.
949  *
950  * Note: we avoid creating references to columns of tables that participate
951  * in an SQL JOIN construct, but are not actually used anywhere in the query.
952  * To do so, we do not scan the joinaliasvars list of a join RTE while
953  * scanning the query rangetable, but instead scan each individual entry
954  * of the alias list when we find a reference to it.
955  */
956 static bool
957 find_expr_references_walker(Node *node,
958                                                         find_expr_references_context *context)
959 {
960         if (node == NULL)
961                 return false;
962         if (IsA(node, Var))
963         {
964                 Var                *var = (Var *) node;
965                 List       *rtable;
966                 RangeTblEntry *rte;
967
968                 /* Find matching rtable entry, or complain if not found */
969                 if (var->varlevelsup >= list_length(context->rtables))
970                         elog(ERROR, "invalid varlevelsup %d", var->varlevelsup);
971                 rtable = (List *) list_nth(context->rtables, var->varlevelsup);
972                 if (var->varno <= 0 || var->varno > list_length(rtable))
973                         elog(ERROR, "invalid varno %d", var->varno);
974                 rte = rt_fetch(var->varno, rtable);
975
976                 /*
977                  * A whole-row Var references no specific columns, so adds no new
978                  * dependency.
979                  */
980                 if (var->varattno == InvalidAttrNumber)
981                         return false;
982                 if (rte->rtekind == RTE_RELATION)
983                 {
984                         /* If it's a plain relation, reference this column */
985                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
986                                                            &context->addrs);
987                 }
988                 else if (rte->rtekind == RTE_JOIN)
989                 {
990                         /* Scan join output column to add references to join inputs */
991                         List       *save_rtables;
992
993                         /* We must make the context appropriate for join's level */
994                         save_rtables = context->rtables;
995                         context->rtables = list_copy_tail(context->rtables,
996                                                                                           var->varlevelsup);
997                         if (var->varattno <= 0 ||
998                                 var->varattno > list_length(rte->joinaliasvars))
999                                 elog(ERROR, "invalid varattno %d", var->varattno);
1000                         find_expr_references_walker((Node *) list_nth(rte->joinaliasvars,
1001                                                                                                           var->varattno - 1),
1002                                                                                 context);
1003                         list_free(context->rtables);
1004                         context->rtables = save_rtables;
1005                 }
1006                 return false;
1007         }
1008         if (IsA(node, FuncExpr))
1009         {
1010                 FuncExpr   *funcexpr = (FuncExpr *) node;
1011
1012                 add_object_address(OCLASS_PROC, funcexpr->funcid, 0,
1013                                                    &context->addrs);
1014                 /* fall through to examine arguments */
1015         }
1016         if (IsA(node, OpExpr))
1017         {
1018                 OpExpr     *opexpr = (OpExpr *) node;
1019
1020                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1021                                                    &context->addrs);
1022                 /* fall through to examine arguments */
1023         }
1024         if (IsA(node, DistinctExpr))
1025         {
1026                 DistinctExpr *distinctexpr = (DistinctExpr *) node;
1027
1028                 add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0,
1029                                                    &context->addrs);
1030                 /* fall through to examine arguments */
1031         }
1032         if (IsA(node, ScalarArrayOpExpr))
1033         {
1034                 ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;
1035
1036                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1037                                                    &context->addrs);
1038                 /* fall through to examine arguments */
1039         }
1040         if (IsA(node, NullIfExpr))
1041         {
1042                 NullIfExpr *nullifexpr = (NullIfExpr *) node;
1043
1044                 add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0,
1045                                                    &context->addrs);
1046                 /* fall through to examine arguments */
1047         }
1048         if (IsA(node, Aggref))
1049         {
1050                 Aggref     *aggref = (Aggref *) node;
1051
1052                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
1053                                                    &context->addrs);
1054                 /* fall through to examine arguments */
1055         }
1056         if (IsA(node, SubLink))
1057         {
1058                 SubLink    *sublink = (SubLink *) node;
1059                 ListCell   *opid;
1060
1061                 foreach(opid, sublink->operOids)
1062                 {
1063                         add_object_address(OCLASS_OPERATOR, lfirst_oid(opid), 0,
1064                                                            &context->addrs);
1065                 }
1066                 /* fall through to examine arguments */
1067         }
1068         if (is_subplan(node))
1069         {
1070                 /* Extra work needed here if we ever need this case */
1071                 elog(ERROR, "already-planned subqueries not supported");
1072         }
1073         if (IsA(node, Query))
1074         {
1075                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
1076                 Query      *query = (Query *) node;
1077                 ListCell   *rtable;
1078                 bool            result;
1079
1080                 /*
1081                  * Add whole-relation refs for each plain relation mentioned in
1082                  * the subquery's rtable.  (Note: query_tree_walker takes care of
1083                  * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need
1084                  * to do that here.  But keep it from looking at join alias
1085                  * lists.)
1086                  */
1087                 foreach(rtable, query->rtable)
1088                 {
1089                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
1090
1091                         if (rte->rtekind == RTE_RELATION)
1092                                 add_object_address(OCLASS_CLASS, rte->relid, 0,
1093                                                                    &context->addrs);
1094                 }
1095
1096                 /* Examine substructure of query */
1097                 context->rtables = lcons(query->rtable, context->rtables);
1098                 result = query_tree_walker(query,
1099                                                                    find_expr_references_walker,
1100                                                                    (void *) context,
1101                                                                    QTW_IGNORE_JOINALIASES);
1102                 context->rtables = list_delete_first(context->rtables);
1103                 return result;
1104         }
1105         return expression_tree_walker(node, find_expr_references_walker,
1106                                                                   (void *) context);
1107 }
1108
1109 /*
1110  * Given an array of dependency references, eliminate any duplicates.
1111  */
1112 static void
1113 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
1114 {
1115         ObjectAddress *priorobj;
1116         int                     oldref,
1117                                 newrefs;
1118
1119         if (addrs->numrefs <= 1)
1120                 return;                                 /* nothing to do */
1121
1122         /* Sort the refs so that duplicates are adjacent */
1123         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
1124                   object_address_comparator);
1125
1126         /* Remove dups */
1127         priorobj = addrs->refs;
1128         newrefs = 1;
1129         for (oldref = 1; oldref < addrs->numrefs; oldref++)
1130         {
1131                 ObjectAddress *thisobj = addrs->refs + oldref;
1132
1133                 if (priorobj->classId == thisobj->classId &&
1134                         priorobj->objectId == thisobj->objectId)
1135                 {
1136                         if (priorobj->objectSubId == thisobj->objectSubId)
1137                                 continue;               /* identical, so drop thisobj */
1138
1139                         /*
1140                          * If we have a whole-object reference and a reference to a
1141                          * part of the same object, we don't need the whole-object
1142                          * reference (for example, we don't need to reference both
1143                          * table foo and column foo.bar).  The whole-object reference
1144                          * will always appear first in the sorted list.
1145                          */
1146                         if (priorobj->objectSubId == 0)
1147                         {
1148                                 /* replace whole ref with partial */
1149                                 priorobj->objectSubId = thisobj->objectSubId;
1150                                 continue;
1151                         }
1152                 }
1153                 /* Not identical, so add thisobj to output set */
1154                 priorobj++;
1155                 priorobj->classId = thisobj->classId;
1156                 priorobj->objectId = thisobj->objectId;
1157                 priorobj->objectSubId = thisobj->objectSubId;
1158                 newrefs++;
1159         }
1160
1161         addrs->numrefs = newrefs;
1162 }
1163
1164 /*
1165  * qsort comparator for ObjectAddress items
1166  */
1167 static int
1168 object_address_comparator(const void *a, const void *b)
1169 {
1170         const ObjectAddress *obja = (const ObjectAddress *) a;
1171         const ObjectAddress *objb = (const ObjectAddress *) b;
1172
1173         if (obja->classId < objb->classId)
1174                 return -1;
1175         if (obja->classId > objb->classId)
1176                 return 1;
1177         if (obja->objectId < objb->objectId)
1178                 return -1;
1179         if (obja->objectId > objb->objectId)
1180                 return 1;
1181
1182         /*
1183          * We sort the subId as an unsigned int so that 0 will come first. See
1184          * logic in eliminate_duplicate_dependencies.
1185          */
1186         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
1187                 return -1;
1188         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
1189                 return 1;
1190         return 0;
1191 }
1192
1193 /*
1194  * Routines for handling an expansible array of ObjectAddress items.
1195  *
1196  * init_object_addresses: initialize an ObjectAddresses array.
1197  */
1198 static void
1199 init_object_addresses(ObjectAddresses *addrs)
1200 {
1201         /* Initialize array to empty */
1202         addrs->numrefs = 0;
1203         addrs->maxrefs = 32;            /* arbitrary initial array size */
1204         addrs->refs = (ObjectAddress *)
1205                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
1206
1207         /* Initialize object_classes[] if not done yet */
1208         /* This will be needed by add_object_address() */
1209         if (!object_classes_initialized)
1210                 init_object_classes();
1211 }
1212
1213 /*
1214  * Add an entry to an ObjectAddresses array.
1215  *
1216  * It is convenient to specify the class by ObjectClass rather than directly
1217  * by catalog OID.
1218  */
1219 static void
1220 add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
1221                                    ObjectAddresses *addrs)
1222 {
1223         ObjectAddress *item;
1224
1225         /* enlarge array if needed */
1226         if (addrs->numrefs >= addrs->maxrefs)
1227         {
1228                 addrs->maxrefs *= 2;
1229                 addrs->refs = (ObjectAddress *)
1230                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1231         }
1232         /* record this item */
1233         item = addrs->refs + addrs->numrefs;
1234         item->classId = object_classes[oclass];
1235         item->objectId = objectId;
1236         item->objectSubId = subId;
1237         addrs->numrefs++;
1238 }
1239
1240 /*
1241  * Add an entry to an ObjectAddresses array.
1242  *
1243  * As above, but specify entry exactly.
1244  */
1245 static void
1246 add_exact_object_address(const ObjectAddress *object,
1247                                                  ObjectAddresses *addrs)
1248 {
1249         ObjectAddress *item;
1250
1251         /* enlarge array if needed */
1252         if (addrs->numrefs >= addrs->maxrefs)
1253         {
1254                 addrs->maxrefs *= 2;
1255                 addrs->refs = (ObjectAddress *)
1256                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1257         }
1258         /* record this item */
1259         item = addrs->refs + addrs->numrefs;
1260         *item = *object;
1261         addrs->numrefs++;
1262 }
1263
1264 /*
1265  * Test whether an object is present in an ObjectAddresses array.
1266  *
1267  * We return "true" if object is a subobject of something in the array, too.
1268  */
1269 static bool
1270 object_address_present(const ObjectAddress *object,
1271                                            ObjectAddresses *addrs)
1272 {
1273         int                     i;
1274
1275         for (i = addrs->numrefs - 1; i >= 0; i--)
1276         {
1277                 ObjectAddress *thisobj = addrs->refs + i;
1278
1279                 if (object->classId == thisobj->classId &&
1280                         object->objectId == thisobj->objectId)
1281                 {
1282                         if (object->objectSubId == thisobj->objectSubId ||
1283                                 thisobj->objectSubId == 0)
1284                                 return true;
1285                 }
1286         }
1287
1288         return false;
1289 }
1290
1291 /*
1292  * Clean up when done with an ObjectAddresses array.
1293  */
1294 static void
1295 term_object_addresses(ObjectAddresses *addrs)
1296 {
1297         pfree(addrs->refs);
1298 }
1299
1300 /*
1301  * Initialize the object_classes[] table.
1302  *
1303  * Although some of these OIDs aren't compile-time constants, they surely
1304  * shouldn't change during a backend's run.  So, we look them up the
1305  * first time through and then cache them.
1306  */
1307 static void
1308 init_object_classes(void)
1309 {
1310         object_classes[OCLASS_CLASS] = RelationRelationId;
1311         object_classes[OCLASS_PROC] = ProcedureRelationId;
1312         object_classes[OCLASS_TYPE] = TypeRelationId;
1313         object_classes[OCLASS_CAST] = get_system_catalog_relid(CastRelationName);
1314         object_classes[OCLASS_CONSTRAINT] = get_system_catalog_relid(ConstraintRelationName);
1315         object_classes[OCLASS_CONVERSION] = get_system_catalog_relid(ConversionRelationName);
1316         object_classes[OCLASS_DEFAULT] = get_system_catalog_relid(AttrDefaultRelationName);
1317         object_classes[OCLASS_LANGUAGE] = get_system_catalog_relid(LanguageRelationName);
1318         object_classes[OCLASS_OPERATOR] = get_system_catalog_relid(OperatorRelationName);
1319         object_classes[OCLASS_OPCLASS] = get_system_catalog_relid(OperatorClassRelationName);
1320         object_classes[OCLASS_REWRITE] = get_system_catalog_relid(RewriteRelationName);
1321         object_classes[OCLASS_TRIGGER] = get_system_catalog_relid(TriggerRelationName);
1322         object_classes[OCLASS_SCHEMA] = get_system_catalog_relid(NamespaceRelationName);
1323         object_classes_initialized = true;
1324 }
1325
1326 /*
1327  * Determine the class of a given object identified by objectAddress.
1328  *
1329  * This function is needed just because some of the system catalogs do
1330  * not have hardwired-at-compile-time OIDs.
1331  */
1332 ObjectClass
1333 getObjectClass(const ObjectAddress *object)
1334 {
1335         /* Easy for the bootstrapped catalogs... */
1336         switch (object->classId)
1337         {
1338                 case RelationRelationId:
1339                         /* caller must check objectSubId */
1340                         return OCLASS_CLASS;
1341
1342                 case ProcedureRelationId:
1343                         Assert(object->objectSubId == 0);
1344                         return OCLASS_PROC;
1345
1346                 case TypeRelationId:
1347                         Assert(object->objectSubId == 0);
1348                         return OCLASS_TYPE;
1349         }
1350
1351         /*
1352          * Handle cases where catalog's OID is not hardwired.
1353          */
1354         if (!object_classes_initialized)
1355                 init_object_classes();
1356
1357         if (object->classId == object_classes[OCLASS_CAST])
1358         {
1359                 Assert(object->objectSubId == 0);
1360                 return OCLASS_CAST;
1361         }
1362         if (object->classId == object_classes[OCLASS_CONSTRAINT])
1363         {
1364                 Assert(object->objectSubId == 0);
1365                 return OCLASS_CONSTRAINT;
1366         }
1367         if (object->classId == object_classes[OCLASS_CONVERSION])
1368         {
1369                 Assert(object->objectSubId == 0);
1370                 return OCLASS_CONVERSION;
1371         }
1372         if (object->classId == object_classes[OCLASS_DEFAULT])
1373         {
1374                 Assert(object->objectSubId == 0);
1375                 return OCLASS_DEFAULT;
1376         }
1377         if (object->classId == object_classes[OCLASS_LANGUAGE])
1378         {
1379                 Assert(object->objectSubId == 0);
1380                 return OCLASS_LANGUAGE;
1381         }
1382         if (object->classId == object_classes[OCLASS_OPERATOR])
1383         {
1384                 Assert(object->objectSubId == 0);
1385                 return OCLASS_OPERATOR;
1386         }
1387         if (object->classId == object_classes[OCLASS_OPCLASS])
1388         {
1389                 Assert(object->objectSubId == 0);
1390                 return OCLASS_OPCLASS;
1391         }
1392         if (object->classId == object_classes[OCLASS_REWRITE])
1393         {
1394                 Assert(object->objectSubId == 0);
1395                 return OCLASS_REWRITE;
1396         }
1397         if (object->classId == object_classes[OCLASS_TRIGGER])
1398         {
1399                 Assert(object->objectSubId == 0);
1400                 return OCLASS_TRIGGER;
1401         }
1402         if (object->classId == object_classes[OCLASS_SCHEMA])
1403         {
1404                 Assert(object->objectSubId == 0);
1405                 return OCLASS_SCHEMA;
1406         }
1407
1408         elog(ERROR, "unrecognized object class: %u", object->classId);
1409         return OCLASS_CLASS;            /* keep compiler quiet */
1410 }
1411
1412 /*
1413  * getObjectDescription: build an object description for messages
1414  *
1415  * The result is a palloc'd string.
1416  */
1417 char *
1418 getObjectDescription(const ObjectAddress *object)
1419 {
1420         StringInfoData buffer;
1421
1422         initStringInfo(&buffer);
1423
1424         switch (getObjectClass(object))
1425         {
1426                 case OCLASS_CLASS:
1427                         getRelationDescription(&buffer, object->objectId);
1428                         if (object->objectSubId != 0)
1429                                 appendStringInfo(&buffer, _(" column %s"),
1430                                                            get_relid_attribute_name(object->objectId,
1431                                                                                                    object->objectSubId));
1432                         break;
1433
1434                 case OCLASS_PROC:
1435                         appendStringInfo(&buffer, _("function %s"),
1436                                                          format_procedure(object->objectId));
1437                         break;
1438
1439                 case OCLASS_TYPE:
1440                         appendStringInfo(&buffer, _("type %s"),
1441                                                          format_type_be(object->objectId));
1442                         break;
1443
1444                 case OCLASS_CAST:
1445                         {
1446                                 Relation        castDesc;
1447                                 ScanKeyData skey[1];
1448                                 SysScanDesc rcscan;
1449                                 HeapTuple       tup;
1450                                 Form_pg_cast castForm;
1451
1452                                 castDesc = heap_openr(CastRelationName, AccessShareLock);
1453
1454                                 ScanKeyInit(&skey[0],
1455                                                         ObjectIdAttributeNumber,
1456                                                         BTEqualStrategyNumber, F_OIDEQ,
1457                                                         ObjectIdGetDatum(object->objectId));
1458
1459                                 rcscan = systable_beginscan(castDesc, CastOidIndex, true,
1460                                                                                         SnapshotNow, 1, skey);
1461
1462                                 tup = systable_getnext(rcscan);
1463
1464                                 if (!HeapTupleIsValid(tup))
1465                                         elog(ERROR, "could not find tuple for cast %u",
1466                                                  object->objectId);
1467
1468                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
1469
1470                                 appendStringInfo(&buffer, _("cast from %s to %s"),
1471                                                                  format_type_be(castForm->castsource),
1472                                                                  format_type_be(castForm->casttarget));
1473
1474                                 systable_endscan(rcscan);
1475                                 heap_close(castDesc, AccessShareLock);
1476                                 break;
1477                         }
1478
1479                 case OCLASS_CONSTRAINT:
1480                         {
1481                                 Relation        conDesc;
1482                                 ScanKeyData skey[1];
1483                                 SysScanDesc rcscan;
1484                                 HeapTuple       tup;
1485                                 Form_pg_constraint con;
1486
1487                                 conDesc = heap_openr(ConstraintRelationName, AccessShareLock);
1488
1489                                 ScanKeyInit(&skey[0],
1490                                                         ObjectIdAttributeNumber,
1491                                                         BTEqualStrategyNumber, F_OIDEQ,
1492                                                         ObjectIdGetDatum(object->objectId));
1493
1494                                 rcscan = systable_beginscan(conDesc, ConstraintOidIndex, true,
1495                                                                                         SnapshotNow, 1, skey);
1496
1497                                 tup = systable_getnext(rcscan);
1498
1499                                 if (!HeapTupleIsValid(tup))
1500                                         elog(ERROR, "could not find tuple for constraint %u",
1501                                                  object->objectId);
1502
1503                                 con = (Form_pg_constraint) GETSTRUCT(tup);
1504
1505                                 if (OidIsValid(con->conrelid))
1506                                 {
1507                                         appendStringInfo(&buffer, _("constraint %s on "),
1508                                                                          NameStr(con->conname));
1509                                         getRelationDescription(&buffer, con->conrelid);
1510                                 }
1511                                 else
1512                                 {
1513                                         appendStringInfo(&buffer, _("constraint %s"),
1514                                                                          NameStr(con->conname));
1515                                 }
1516
1517                                 systable_endscan(rcscan);
1518                                 heap_close(conDesc, AccessShareLock);
1519                                 break;
1520                         }
1521
1522                 case OCLASS_CONVERSION:
1523                         {
1524                                 HeapTuple       conTup;
1525
1526                                 conTup = SearchSysCache(CONOID,
1527                                                                           ObjectIdGetDatum(object->objectId),
1528                                                                                 0, 0, 0);
1529                                 if (!HeapTupleIsValid(conTup))
1530                                         elog(ERROR, "cache lookup failed for conversion %u",
1531                                                  object->objectId);
1532                                 appendStringInfo(&buffer, _("conversion %s"),
1533                                                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
1534                                 ReleaseSysCache(conTup);
1535                                 break;
1536                         }
1537
1538                 case OCLASS_DEFAULT:
1539                         {
1540                                 Relation        attrdefDesc;
1541                                 ScanKeyData skey[1];
1542                                 SysScanDesc adscan;
1543                                 HeapTuple       tup;
1544                                 Form_pg_attrdef attrdef;
1545                                 ObjectAddress colobject;
1546
1547                                 attrdefDesc = heap_openr(AttrDefaultRelationName, AccessShareLock);
1548
1549                                 ScanKeyInit(&skey[0],
1550                                                         ObjectIdAttributeNumber,
1551                                                         BTEqualStrategyNumber, F_OIDEQ,
1552                                                         ObjectIdGetDatum(object->objectId));
1553
1554                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndex,
1555                                                                                         true, SnapshotNow, 1, skey);
1556
1557                                 tup = systable_getnext(adscan);
1558
1559                                 if (!HeapTupleIsValid(tup))
1560                                         elog(ERROR, "could not find tuple for attrdef %u",
1561                                                  object->objectId);
1562
1563                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
1564
1565                                 colobject.classId = RelationRelationId;
1566                                 colobject.objectId = attrdef->adrelid;
1567                                 colobject.objectSubId = attrdef->adnum;
1568
1569                                 appendStringInfo(&buffer, _("default for %s"),
1570                                                                  getObjectDescription(&colobject));
1571
1572                                 systable_endscan(adscan);
1573                                 heap_close(attrdefDesc, AccessShareLock);
1574                                 break;
1575                         }
1576
1577                 case OCLASS_LANGUAGE:
1578                         {
1579                                 HeapTuple       langTup;
1580
1581                                 langTup = SearchSysCache(LANGOID,
1582                                                                           ObjectIdGetDatum(object->objectId),
1583                                                                                  0, 0, 0);
1584                                 if (!HeapTupleIsValid(langTup))
1585                                         elog(ERROR, "cache lookup failed for language %u",
1586                                                  object->objectId);
1587                                 appendStringInfo(&buffer, _("language %s"),
1588                                                                  NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
1589                                 ReleaseSysCache(langTup);
1590                                 break;
1591                         }
1592
1593                 case OCLASS_OPERATOR:
1594                         appendStringInfo(&buffer, _("operator %s"),
1595                                                          format_operator(object->objectId));
1596                         break;
1597
1598                 case OCLASS_OPCLASS:
1599                         {
1600                                 HeapTuple       opcTup;
1601                                 Form_pg_opclass opcForm;
1602                                 HeapTuple       amTup;
1603                                 Form_pg_am      amForm;
1604                                 char       *nspname;
1605
1606                                 opcTup = SearchSysCache(CLAOID,
1607                                                                           ObjectIdGetDatum(object->objectId),
1608                                                                                 0, 0, 0);
1609                                 if (!HeapTupleIsValid(opcTup))
1610                                         elog(ERROR, "cache lookup failed for opclass %u",
1611                                                  object->objectId);
1612                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
1613
1614                                 amTup = SearchSysCache(AMOID,
1615                                                                            ObjectIdGetDatum(opcForm->opcamid),
1616                                                                            0, 0, 0);
1617                                 if (!HeapTupleIsValid(amTup))
1618                                         elog(ERROR, "cache lookup failed for access method %u",
1619                                                  opcForm->opcamid);
1620                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
1621
1622                                 /* Qualify the name if not visible in search path */
1623                                 if (OpclassIsVisible(object->objectId))
1624                                         nspname = NULL;
1625                                 else
1626                                         nspname = get_namespace_name(opcForm->opcnamespace);
1627
1628                                 appendStringInfo(&buffer, _("operator class %s for access method %s"),
1629                                                                  quote_qualified_identifier(nspname,
1630                                                                                           NameStr(opcForm->opcname)),
1631                                                                  NameStr(amForm->amname));
1632
1633                                 ReleaseSysCache(amTup);
1634                                 ReleaseSysCache(opcTup);
1635                                 break;
1636                         }
1637
1638                 case OCLASS_REWRITE:
1639                         {
1640                                 Relation        ruleDesc;
1641                                 ScanKeyData skey[1];
1642                                 SysScanDesc rcscan;
1643                                 HeapTuple       tup;
1644                                 Form_pg_rewrite rule;
1645
1646                                 ruleDesc = heap_openr(RewriteRelationName, AccessShareLock);
1647
1648                                 ScanKeyInit(&skey[0],
1649                                                         ObjectIdAttributeNumber,
1650                                                         BTEqualStrategyNumber, F_OIDEQ,
1651                                                         ObjectIdGetDatum(object->objectId));
1652
1653                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndex, true,
1654                                                                                         SnapshotNow, 1, skey);
1655
1656                                 tup = systable_getnext(rcscan);
1657
1658                                 if (!HeapTupleIsValid(tup))
1659                                         elog(ERROR, "could not find tuple for rule %u",
1660                                                  object->objectId);
1661
1662                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
1663
1664                                 appendStringInfo(&buffer, _("rule %s on "),
1665                                                                  NameStr(rule->rulename));
1666                                 getRelationDescription(&buffer, rule->ev_class);
1667
1668                                 systable_endscan(rcscan);
1669                                 heap_close(ruleDesc, AccessShareLock);
1670                                 break;
1671                         }
1672
1673                 case OCLASS_TRIGGER:
1674                         {
1675                                 Relation        trigDesc;
1676                                 ScanKeyData skey[1];
1677                                 SysScanDesc tgscan;
1678                                 HeapTuple       tup;
1679                                 Form_pg_trigger trig;
1680
1681                                 trigDesc = heap_openr(TriggerRelationName, AccessShareLock);
1682
1683                                 ScanKeyInit(&skey[0],
1684                                                         ObjectIdAttributeNumber,
1685                                                         BTEqualStrategyNumber, F_OIDEQ,
1686                                                         ObjectIdGetDatum(object->objectId));
1687
1688                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndex, true,
1689                                                                                         SnapshotNow, 1, skey);
1690
1691                                 tup = systable_getnext(tgscan);
1692
1693                                 if (!HeapTupleIsValid(tup))
1694                                         elog(ERROR, "could not find tuple for trigger %u",
1695                                                  object->objectId);
1696
1697                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
1698
1699                                 appendStringInfo(&buffer, _("trigger %s on "),
1700                                                                  NameStr(trig->tgname));
1701                                 getRelationDescription(&buffer, trig->tgrelid);
1702
1703                                 systable_endscan(tgscan);
1704                                 heap_close(trigDesc, AccessShareLock);
1705                                 break;
1706                         }
1707
1708                 case OCLASS_SCHEMA:
1709                         {
1710                                 char       *nspname;
1711
1712                                 nspname = get_namespace_name(object->objectId);
1713                                 if (!nspname)
1714                                         elog(ERROR, "cache lookup failed for namespace %u",
1715                                                  object->objectId);
1716                                 appendStringInfo(&buffer, _("schema %s"), nspname);
1717                                 break;
1718                         }
1719
1720                 default:
1721                         appendStringInfo(&buffer, "unrecognized object %u %u %d",
1722                                                          object->classId,
1723                                                          object->objectId,
1724                                                          object->objectSubId);
1725                         break;
1726         }
1727
1728         return buffer.data;
1729 }
1730
1731 /*
1732  * subroutine for getObjectDescription: describe a relation
1733  */
1734 static void
1735 getRelationDescription(StringInfo buffer, Oid relid)
1736 {
1737         HeapTuple       relTup;
1738         Form_pg_class relForm;
1739         char       *nspname;
1740         char       *relname;
1741
1742         relTup = SearchSysCache(RELOID,
1743                                                         ObjectIdGetDatum(relid),
1744                                                         0, 0, 0);
1745         if (!HeapTupleIsValid(relTup))
1746                 elog(ERROR, "cache lookup failed for relation %u", relid);
1747         relForm = (Form_pg_class) GETSTRUCT(relTup);
1748
1749         /* Qualify the name if not visible in search path */
1750         if (RelationIsVisible(relid))
1751                 nspname = NULL;
1752         else
1753                 nspname = get_namespace_name(relForm->relnamespace);
1754
1755         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
1756
1757         switch (relForm->relkind)
1758         {
1759                 case RELKIND_RELATION:
1760                         appendStringInfo(buffer, _("table %s"),
1761                                                          relname);
1762                         break;
1763                 case RELKIND_INDEX:
1764                         appendStringInfo(buffer, _("index %s"),
1765                                                          relname);
1766                         break;
1767                 case RELKIND_SPECIAL:
1768                         appendStringInfo(buffer, _("special system relation %s"),
1769                                                          relname);
1770                         break;
1771                 case RELKIND_SEQUENCE:
1772                         appendStringInfo(buffer, _("sequence %s"),
1773                                                          relname);
1774                         break;
1775                 case RELKIND_UNCATALOGED:
1776                         appendStringInfo(buffer, _("uncataloged table %s"),
1777                                                          relname);
1778                         break;
1779                 case RELKIND_TOASTVALUE:
1780                         appendStringInfo(buffer, _("toast table %s"),
1781                                                          relname);
1782                         break;
1783                 case RELKIND_VIEW:
1784                         appendStringInfo(buffer, _("view %s"),
1785                                                          relname);
1786                         break;
1787                 case RELKIND_COMPOSITE_TYPE:
1788                         appendStringInfo(buffer, _("composite type %s"),
1789                                                          relname);
1790                         break;
1791                 default:
1792                         /* shouldn't get here */
1793                         appendStringInfo(buffer, _("relation %s"),
1794                                                          relname);
1795                         break;
1796         }
1797
1798         ReleaseSysCache(relTup);
1799 }