]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
Back out ALTER DOMAIN patch until missing file appears.
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.16 2002/12/06 03:42:54 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_attrdef.h"
26 #include "catalog/pg_cast.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_conversion.h"
29 #include "catalog/pg_depend.h"
30 #include "catalog/pg_language.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_rewrite.h"
33 #include "catalog/pg_trigger.h"
34 #include "commands/comment.h"
35 #include "commands/defrem.h"
36 #include "commands/proclang.h"
37 #include "commands/schemacmds.h"
38 #include "commands/trigger.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "optimizer/clauses.h"
42 #include "parser/parsetree.h"
43 #include "rewrite/rewriteRemove.h"
44 #include "utils/builtins.h"
45 #include "utils/fmgroids.h"
46 #include "utils/lsyscache.h"
47 #include "utils/syscache.h"
48
49
50 /* This enum covers all system catalogs whose OIDs can appear in classid. */
51 typedef enum ObjectClasses
52 {
53         OCLASS_CLASS,                           /* pg_class */
54         OCLASS_PROC,                            /* pg_proc */
55         OCLASS_TYPE,                            /* pg_type */
56         OCLASS_CAST,                            /* pg_cast */
57         OCLASS_CONSTRAINT,                      /* pg_constraint */
58         OCLASS_CONVERSION,                      /* pg_conversion */
59         OCLASS_DEFAULT,                         /* pg_attrdef */
60         OCLASS_LANGUAGE,                        /* pg_language */
61         OCLASS_OPERATOR,                        /* pg_operator */
62         OCLASS_OPCLASS,                         /* pg_opclass */
63         OCLASS_REWRITE,                         /* pg_rewrite */
64         OCLASS_TRIGGER,                         /* pg_trigger */
65         OCLASS_SCHEMA,                          /* pg_namespace */
66         MAX_OCLASS                                      /* MUST BE LAST */
67 } ObjectClasses;
68
69 /* expansible list of ObjectAddresses */
70 typedef struct
71 {
72         ObjectAddress *refs;            /* => palloc'd array */
73         int                     numrefs;                /* current number of references */
74         int                     maxrefs;                /* current size of palloc'd array */
75 } ObjectAddresses;
76
77 /* for find_expr_references_walker */
78 typedef struct
79 {
80         ObjectAddresses addrs;          /* addresses being accumulated */
81         List       *rtables;            /* list of rangetables to resolve Vars */
82 } find_expr_references_context;
83
84
85 /*
86  * Because not all system catalogs have predetermined OIDs, we build a table
87  * mapping between ObjectClasses and OIDs.      This is done at most once per
88  * backend run, to minimize lookup overhead.
89  */
90 static bool object_classes_initialized = false;
91 static Oid      object_classes[MAX_OCLASS];
92
93
94 static void findAutoDeletableObjects(const ObjectAddress *object,
95                                                                          ObjectAddresses *oktodelete,
96                                                                          Relation depRel);
97 static bool recursiveDeletion(const ObjectAddress *object,
98                                   DropBehavior behavior,
99                                   const ObjectAddress *callingObject,
100                                   ObjectAddresses *oktodelete,
101                                   Relation depRel);
102 static void doDeletion(const ObjectAddress *object);
103 static bool find_expr_references_walker(Node *node,
104                                                         find_expr_references_context *context);
105 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
106 static int      object_address_comparator(const void *a, const void *b);
107 static void init_object_addresses(ObjectAddresses *addrs);
108 static void add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
109                                    ObjectAddresses *addrs);
110 static void add_exact_object_address(const ObjectAddress *object,
111                                                  ObjectAddresses *addrs);
112 static bool object_address_present(const ObjectAddress *object,
113                                    ObjectAddresses *addrs);
114 static void term_object_addresses(ObjectAddresses *addrs);
115 static void init_object_classes(void);
116 static ObjectClasses getObjectClass(const ObjectAddress *object);
117 static char *getObjectDescription(const ObjectAddress *object);
118 static void getRelationDescription(StringInfo buffer, Oid relid);
119
120
121 /*
122  * performDeletion: attempt to drop the specified object.  If CASCADE
123  * behavior is specified, also drop any dependent objects (recursively).
124  * If RESTRICT behavior is specified, error out if there are any dependent
125  * objects, except for those that should be implicitly dropped anyway
126  * according to the dependency type.
127  *
128  * This is the outer control routine for all forms of DROP that drop objects
129  * that can participate in dependencies.
130  */
131 void
132 performDeletion(const ObjectAddress *object,
133                                 DropBehavior behavior)
134 {
135         char       *objDescription;
136         Relation        depRel;
137         ObjectAddresses oktodelete;
138
139         /*
140          * Get object description for possible use in failure message. Must do
141          * this before deleting it ...
142          */
143         objDescription = getObjectDescription(object);
144
145         /*
146          * We save some cycles by opening pg_depend just once and passing the
147          * Relation pointer down to all the recursive deletion steps.
148          */
149         depRel = heap_openr(DependRelationName, RowExclusiveLock);
150
151         /*
152          * Construct a list of objects that are reachable by AUTO or INTERNAL
153          * dependencies from the target object.  These should be deleted silently,
154          * even if the actual deletion pass first reaches one of them via a
155          * non-auto dependency.
156          */
157         init_object_addresses(&oktodelete);
158
159         findAutoDeletableObjects(object, &oktodelete, depRel);
160
161         if (!recursiveDeletion(object, behavior, NULL, &oktodelete, depRel))
162                 elog(ERROR, "Cannot drop %s because other objects depend on it"
163                          "\n\tUse DROP ... CASCADE to drop the dependent objects too",
164                          objDescription);
165
166         term_object_addresses(&oktodelete);
167
168         heap_close(depRel, RowExclusiveLock);
169
170         pfree(objDescription);
171 }
172
173
174 /*
175  * findAutoDeletableObjects: find all objects that are reachable by AUTO or
176  * INTERNAL dependency paths from the given object.  Add them all to the
177  * oktodelete list.  Note that the originally given object will also be
178  * added to the list.
179  *
180  * depRel is the already-open pg_depend relation.
181  */
182 static void
183 findAutoDeletableObjects(const ObjectAddress *object,
184                                                  ObjectAddresses *oktodelete,
185                                                  Relation depRel)
186 {
187         ScanKeyData key[3];
188         int                     nkeys;
189         SysScanDesc scan;
190         HeapTuple       tup;
191         ObjectAddress otherObject;
192
193         /*
194          * If this object is already in oktodelete, then we already visited it;
195          * don't do so again (this prevents infinite recursion if there's a loop
196          * in pg_depend).  Otherwise, add it.
197          */
198         if (object_address_present(object, oktodelete))
199                 return;
200         add_exact_object_address(object, oktodelete);
201
202         /*
203          * Scan pg_depend records that link to this object, showing the things
204          * that depend on it.  For each one that is AUTO or INTERNAL, visit the
205          * referencing object.
206          *
207          * When dropping a whole object (subId = 0), find pg_depend records for
208          * its sub-objects too.
209          */
210         ScanKeyEntryInitialize(&key[0], 0x0,
211                                                    Anum_pg_depend_refclassid, F_OIDEQ,
212                                                    ObjectIdGetDatum(object->classId));
213         ScanKeyEntryInitialize(&key[1], 0x0,
214                                                    Anum_pg_depend_refobjid, F_OIDEQ,
215                                                    ObjectIdGetDatum(object->objectId));
216         if (object->objectSubId != 0)
217         {
218                 ScanKeyEntryInitialize(&key[2], 0x0,
219                                                            Anum_pg_depend_refobjsubid, F_INT4EQ,
220                                                            Int32GetDatum(object->objectSubId));
221                 nkeys = 3;
222         }
223         else
224                 nkeys = 2;
225
226         scan = systable_beginscan(depRel, DependReferenceIndex, true,
227                                                           SnapshotNow, nkeys, key);
228
229         while (HeapTupleIsValid(tup = systable_getnext(scan)))
230         {
231                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
232
233                 switch (foundDep->deptype)
234                 {
235                         case DEPENDENCY_NORMAL:
236                                 /* ignore */
237                                 break;
238                         case DEPENDENCY_AUTO:
239                         case DEPENDENCY_INTERNAL:
240                                 /* recurse */
241                                 otherObject.classId = foundDep->classid;
242                                 otherObject.objectId = foundDep->objid;
243                                 otherObject.objectSubId = foundDep->objsubid;
244                                 findAutoDeletableObjects(&otherObject, oktodelete, depRel);
245                                 break;
246                         case DEPENDENCY_PIN:
247                                 /*
248                                  * For a PIN dependency we just elog immediately; there
249                                  * won't be any others to examine, and we aren't ever
250                                  * going to let the user delete it.
251                                  */
252                                 elog(ERROR, "Cannot drop %s because it is required by the database system",
253                                          getObjectDescription(object));
254                                 break;
255                         default:
256                                 elog(ERROR, "findAutoDeletableObjects: unknown dependency type '%c' for %s",
257                                          foundDep->deptype, getObjectDescription(object));
258                                 break;
259                 }
260         }
261
262         systable_endscan(scan);
263 }
264
265
266 /*
267  * recursiveDeletion: delete a single object for performDeletion, plus
268  * (recursively) anything that depends on it.
269  *
270  * Returns TRUE if successful, FALSE if not.
271  *
272  * callingObject is NULL at the outer level, else identifies the object that
273  * we recursed from (the reference object that someone else needs to delete).
274  *
275  * oktodelete is a list of objects verified deletable (ie, reachable by one
276  * or more AUTO or INTERNAL dependencies from the original target).
277  *
278  * depRel is the already-open pg_depend relation.
279  *
280  *
281  * In RESTRICT mode, we perform all the deletions anyway, but elog a NOTICE
282  * and return FALSE if we find a restriction violation.  performDeletion
283  * will then abort the transaction to nullify the deletions.  We have to
284  * do it this way to (a) report all the direct and indirect dependencies
285  * while (b) not going into infinite recursion if there's a cycle.
286  *
287  * This is even more complex than one could wish, because it is possible for
288  * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
289  * dependencies.  Also, we might have a situation where we've been asked to
290  * delete object A, and objects B and C both have AUTO dependencies on A,
291  * but B also has a NORMAL dependency on C.  (Since any of these paths might
292  * be indirect, we can't prevent these scenarios, but must cope instead.)
293  * If we visit C before B then we would mistakenly decide that the B->C link
294  * should prevent the restricted drop from occurring.  To handle this, we make
295  * a pre-scan to find all the objects that are auto-deletable from A.  If we
296  * visit C first, but B is present in the oktodelete list, then we make no
297  * complaint but recurse to delete B anyway.  (Note that in general we must
298  * delete B before deleting C; the drop routine for B may try to access C.)
299  *
300  * Note: in the case where the path to B is traversed first, we will not
301  * see the NORMAL dependency when we reach C, because of the pg_depend
302  * removals done in step 1.  The oktodelete list is necessary just
303  * to make the behavior independent of the order in which pg_depend
304  * entries are visited.
305  */
306 static bool
307 recursiveDeletion(const ObjectAddress *object,
308                                   DropBehavior behavior,
309                                   const ObjectAddress *callingObject,
310                                   ObjectAddresses *oktodelete,
311                                   Relation depRel)
312 {
313         bool            ok = true;
314         char       *objDescription;
315         ScanKeyData key[3];
316         int                     nkeys;
317         SysScanDesc scan;
318         HeapTuple       tup;
319         ObjectAddress otherObject;
320         ObjectAddress owningObject;
321         bool            amOwned = false;
322
323         /*
324          * Get object description for possible use in messages.  Must do this
325          * before deleting it ...
326          */
327         objDescription = getObjectDescription(object);
328
329         /*
330          * Step 1: find and remove pg_depend records that link from this
331          * object to others.  We have to do this anyway, and doing it first
332          * ensures that we avoid infinite recursion in the case of cycles.
333          * Also, some dependency types require extra processing here.
334          *
335          * When dropping a whole object (subId = 0), remove all pg_depend records
336          * for its sub-objects too.
337          */
338         ScanKeyEntryInitialize(&key[0], 0x0,
339                                                    Anum_pg_depend_classid, F_OIDEQ,
340                                                    ObjectIdGetDatum(object->classId));
341         ScanKeyEntryInitialize(&key[1], 0x0,
342                                                    Anum_pg_depend_objid, F_OIDEQ,
343                                                    ObjectIdGetDatum(object->objectId));
344         if (object->objectSubId != 0)
345         {
346                 ScanKeyEntryInitialize(&key[2], 0x0,
347                                                            Anum_pg_depend_objsubid, F_INT4EQ,
348                                                            Int32GetDatum(object->objectSubId));
349                 nkeys = 3;
350         }
351         else
352                 nkeys = 2;
353
354         scan = systable_beginscan(depRel, DependDependerIndex, true,
355                                                           SnapshotNow, nkeys, key);
356
357         while (HeapTupleIsValid(tup = systable_getnext(scan)))
358         {
359                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
360
361                 otherObject.classId = foundDep->refclassid;
362                 otherObject.objectId = foundDep->refobjid;
363                 otherObject.objectSubId = foundDep->refobjsubid;
364
365                 switch (foundDep->deptype)
366                 {
367                         case DEPENDENCY_NORMAL:
368                         case DEPENDENCY_AUTO:
369                                 /* no problem */
370                                 break;
371                         case DEPENDENCY_INTERNAL:
372
373                                 /*
374                                  * This object is part of the internal implementation of
375                                  * another object.      We have three cases:
376                                  *
377                                  * 1. At the outermost recursion level, disallow the DROP.
378                                  * (We just elog here, rather than proceeding, since no
379                                  * other dependencies are likely to be interesting.)
380                                  */
381                                 if (callingObject == NULL)
382                                 {
383                                         char       *otherObjDesc = getObjectDescription(&otherObject);
384
385                                         elog(ERROR, "Cannot drop %s because %s requires it"
386                                                  "\n\tYou may drop %s instead",
387                                                  objDescription, otherObjDesc, otherObjDesc);
388                                 }
389
390                                 /*
391                                  * 2. When recursing from the other end of this
392                                  * dependency, it's okay to continue with the deletion.
393                                  * This holds when recursing from a whole object that
394                                  * includes the nominal other end as a component, too.
395                                  */
396                                 if (callingObject->classId == otherObject.classId &&
397                                         callingObject->objectId == otherObject.objectId &&
398                                 (callingObject->objectSubId == otherObject.objectSubId ||
399                                  callingObject->objectSubId == 0))
400                                         break;
401
402                                 /*
403                                  * 3. When recursing from anyplace else, transform this
404                                  * deletion request into a delete of the other object.
405                                  * (This will be an error condition iff RESTRICT mode.) In
406                                  * this case we finish deleting my dependencies except for
407                                  * the INTERNAL link, which will be needed to cause the
408                                  * owning object to recurse back to me.
409                                  */
410                                 if (amOwned)    /* shouldn't happen */
411                                         elog(ERROR, "recursiveDeletion: multiple INTERNAL dependencies for %s",
412                                                  objDescription);
413                                 owningObject = otherObject;
414                                 amOwned = true;
415                                 /* "continue" bypasses the simple_heap_delete call below */
416                                 continue;
417                         case DEPENDENCY_PIN:
418
419                                 /*
420                                  * Should not happen; PIN dependencies should have zeroes
421                                  * in the depender fields...
422                                  */
423                                 elog(ERROR, "recursiveDeletion: incorrect use of PIN dependency with %s",
424                                          objDescription);
425                                 break;
426                         default:
427                                 elog(ERROR, "recursiveDeletion: unknown dependency type '%c' for %s",
428                                          foundDep->deptype, objDescription);
429                                 break;
430                 }
431
432                 simple_heap_delete(depRel, &tup->t_self);
433         }
434
435         systable_endscan(scan);
436
437         /*
438          * CommandCounterIncrement here to ensure that preceding changes are
439          * all visible; in particular, that the above deletions of pg_depend
440          * entries are visible.  That prevents infinite recursion in case of a
441          * dependency loop (which is perfectly legal).
442          */
443         CommandCounterIncrement();
444
445         /*
446          * If we found we are owned by another object, ask it to delete itself
447          * instead of proceeding.  Complain if RESTRICT mode, unless the other
448          * object is in oktodelete.
449          */
450         if (amOwned)
451         {
452                 if (object_address_present(&owningObject, oktodelete))
453                         elog(DEBUG1, "Drop auto-cascades to %s",
454                                  getObjectDescription(&owningObject));
455                 else if (behavior == DROP_RESTRICT)
456                 {
457                         elog(NOTICE, "%s depends on %s",
458                                  getObjectDescription(&owningObject),
459                                  objDescription);
460                         ok = false;
461                 }
462                 else
463                         elog(NOTICE, "Drop cascades to %s",
464                                  getObjectDescription(&owningObject));
465
466                 if (!recursiveDeletion(&owningObject, behavior,
467                                                            object,
468                                                            oktodelete, depRel))
469                         ok = false;
470
471                 pfree(objDescription);
472
473                 return ok;
474         }
475
476         /*
477          * Step 2: scan pg_depend records that link to this object, showing
478          * the things that depend on it.  Recursively delete those things. (We
479          * don't delete the pg_depend records here, as the recursive call will
480          * do that.)  Note it's important to delete the dependent objects
481          * before the referenced one, since the deletion routines might do
482          * things like try to update the pg_class record when deleting a check
483          * constraint.
484          *
485          * Again, when dropping a whole object (subId = 0), find pg_depend
486          * records for its sub-objects too.
487          *
488          * NOTE: because we are using SnapshotNow, if a recursive call deletes
489          * any pg_depend tuples that our scan hasn't yet visited, we will not
490          * see them as good when we do visit them.      This is essential for
491          * correct behavior if there are multiple dependency paths between two
492          * objects --- else we might try to delete an already-deleted object.
493          */
494         ScanKeyEntryInitialize(&key[0], 0x0,
495                                                    Anum_pg_depend_refclassid, F_OIDEQ,
496                                                    ObjectIdGetDatum(object->classId));
497         ScanKeyEntryInitialize(&key[1], 0x0,
498                                                    Anum_pg_depend_refobjid, F_OIDEQ,
499                                                    ObjectIdGetDatum(object->objectId));
500         if (object->objectSubId != 0)
501         {
502                 ScanKeyEntryInitialize(&key[2], 0x0,
503                                                            Anum_pg_depend_refobjsubid, F_INT4EQ,
504                                                            Int32GetDatum(object->objectSubId));
505                 nkeys = 3;
506         }
507         else
508                 nkeys = 2;
509
510         scan = systable_beginscan(depRel, DependReferenceIndex, true,
511                                                           SnapshotNow, nkeys, key);
512
513         while (HeapTupleIsValid(tup = systable_getnext(scan)))
514         {
515                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
516
517                 otherObject.classId = foundDep->classid;
518                 otherObject.objectId = foundDep->objid;
519                 otherObject.objectSubId = foundDep->objsubid;
520
521                 switch (foundDep->deptype)
522                 {
523                         case DEPENDENCY_NORMAL:
524                                 /*
525                                  * Perhaps there was another dependency path that would
526                                  * have allowed silent deletion of the otherObject, had
527                                  * we only taken that path first.
528                                  * In that case, act like this link is AUTO, too.
529                                  */
530                                 if (object_address_present(&otherObject, oktodelete))
531                                         elog(DEBUG1, "Drop auto-cascades to %s",
532                                                  getObjectDescription(&otherObject));
533                                 else if (behavior == DROP_RESTRICT)
534                                 {
535                                         elog(NOTICE, "%s depends on %s",
536                                                  getObjectDescription(&otherObject),
537                                                  objDescription);
538                                         ok = false;
539                                 }
540                                 else
541                                         elog(NOTICE, "Drop cascades to %s",
542                                                  getObjectDescription(&otherObject));
543
544                                 if (!recursiveDeletion(&otherObject, behavior,
545                                                                            object,
546                                                                            oktodelete, depRel))
547                                         ok = false;
548                                 break;
549                         case DEPENDENCY_AUTO:
550                         case DEPENDENCY_INTERNAL:
551
552                                 /*
553                                  * We propagate the DROP without complaint even in the
554                                  * RESTRICT case.  (However, normal dependencies on the
555                                  * component object could still cause failure.)
556                                  */
557                                 elog(DEBUG1, "Drop auto-cascades to %s",
558                                          getObjectDescription(&otherObject));
559
560                                 if (!recursiveDeletion(&otherObject, behavior,
561                                                                            object,
562                                                                            oktodelete, depRel))
563                                         ok = false;
564                                 break;
565                         case DEPENDENCY_PIN:
566
567                                 /*
568                                  * For a PIN dependency we just elog immediately; there
569                                  * won't be any others to report.
570                                  */
571                                 elog(ERROR, "Cannot drop %s because it is required by the database system",
572                                          objDescription);
573                                 break;
574                         default:
575                                 elog(ERROR, "recursiveDeletion: unknown dependency type '%c' for %s",
576                                          foundDep->deptype, objDescription);
577                                 break;
578                 }
579         }
580
581         systable_endscan(scan);
582
583         /*
584          * We do not need CommandCounterIncrement here, since if step 2 did
585          * anything then each recursive call will have ended with one.
586          */
587
588         /*
589          * Step 3: delete the object itself.
590          */
591         doDeletion(object);
592
593         /*
594          * Delete any comments associated with this object.  (This is a
595          * convenient place to do it instead of having every object type know
596          * to do it.)
597          */
598         DeleteComments(object->objectId, object->classId, object->objectSubId);
599
600         /*
601          * CommandCounterIncrement here to ensure that preceding changes are
602          * all visible.
603          */
604         CommandCounterIncrement();
605
606         /*
607          * And we're done!
608          */
609         pfree(objDescription);
610
611         return ok;
612 }
613
614
615 /*
616  * doDeletion: actually delete a single object
617  */
618 static void
619 doDeletion(const ObjectAddress *object)
620 {
621         switch (getObjectClass(object))
622         {
623                 case OCLASS_CLASS:
624                         {
625                                 char            relKind = get_rel_relkind(object->objectId);
626
627                                 if (relKind == RELKIND_INDEX)
628                                 {
629                                         Assert(object->objectSubId == 0);
630                                         index_drop(object->objectId);
631                                 }
632                                 else
633                                 {
634                                         if (object->objectSubId != 0)
635                                                 RemoveAttributeById(object->objectId,
636                                                                                         object->objectSubId);
637                                         else
638                                                 heap_drop_with_catalog(object->objectId);
639                                 }
640                                 break;
641                         }
642
643                 case OCLASS_PROC:
644                         RemoveFunctionById(object->objectId);
645                         break;
646
647                 case OCLASS_TYPE:
648                         RemoveTypeById(object->objectId);
649                         break;
650
651                 case OCLASS_CAST:
652                         DropCastById(object->objectId);
653                         break;
654
655                 case OCLASS_CONSTRAINT:
656                         RemoveConstraintById(object->objectId);
657                         break;
658
659                 case OCLASS_CONVERSION:
660                         RemoveConversionById(object->objectId);
661                         break;
662
663                 case OCLASS_DEFAULT:
664                         RemoveAttrDefaultById(object->objectId);
665                         break;
666
667                 case OCLASS_LANGUAGE:
668                         DropProceduralLanguageById(object->objectId);
669                         break;
670
671                 case OCLASS_OPERATOR:
672                         RemoveOperatorById(object->objectId);
673                         break;
674
675                 case OCLASS_OPCLASS:
676                         RemoveOpClassById(object->objectId);
677                         break;
678
679                 case OCLASS_REWRITE:
680                         RemoveRewriteRuleById(object->objectId);
681                         break;
682
683                 case OCLASS_TRIGGER:
684                         RemoveTriggerById(object->objectId);
685                         break;
686
687                 case OCLASS_SCHEMA:
688                         RemoveSchemaById(object->objectId);
689                         break;
690
691                 default:
692                         elog(ERROR, "doDeletion: Unsupported object class %u",
693                                  object->classId);
694         }
695 }
696
697 /*
698  * recordDependencyOnExpr - find expression dependencies
699  *
700  * This is used to find the dependencies of rules, constraint expressions,
701  * etc.
702  *
703  * Given an expression or query in node-tree form, find all the objects
704  * it refers to (tables, columns, operators, functions, etc).  Record
705  * a dependency of the specified type from the given depender object
706  * to each object mentioned in the expression.
707  *
708  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
709  * It can be NIL if no such variables are expected.
710  *
711  * XXX is it important to create dependencies on the datatypes mentioned in
712  * the expression?      In most cases this would be redundant (eg, a ref to an
713  * operator indirectly references its input and output datatypes), but I'm
714  * not quite convinced there are no cases where we need it.
715  */
716 void
717 recordDependencyOnExpr(const ObjectAddress *depender,
718                                            Node *expr, List *rtable,
719                                            DependencyType behavior)
720 {
721         find_expr_references_context context;
722
723         init_object_addresses(&context.addrs);
724
725         /* Set up interpretation for Vars at varlevelsup = 0 */
726         context.rtables = makeList1(rtable);
727
728         /* Scan the expression tree for referenceable objects */
729         find_expr_references_walker(expr, &context);
730
731         /* Remove any duplicates */
732         eliminate_duplicate_dependencies(&context.addrs);
733
734         /* And record 'em */
735         recordMultipleDependencies(depender,
736                                                            context.addrs.refs, context.addrs.numrefs,
737                                                            behavior);
738
739         term_object_addresses(&context.addrs);
740 }
741
742 /*
743  * Recursively search an expression tree for object references.
744  *
745  * Note: we avoid creating references to columns of tables that participate
746  * in an SQL JOIN construct, but are not actually used anywhere in the query.
747  * To do so, we do not scan the joinaliasvars list of a join RTE while
748  * scanning the query rangetable, but instead scan each individual entry
749  * of the alias list when we find a reference to it.
750  */
751 static bool
752 find_expr_references_walker(Node *node,
753                                                         find_expr_references_context *context)
754 {
755         if (node == NULL)
756                 return false;
757         if (IsA(node, Var))
758         {
759                 Var                *var = (Var *) node;
760                 int                     levelsup;
761                 List       *rtable,
762                                    *rtables;
763                 RangeTblEntry *rte;
764
765                 /* Find matching rtable entry, or complain if not found */
766                 levelsup = var->varlevelsup;
767                 rtables = context->rtables;
768                 while (levelsup--)
769                 {
770                         if (rtables == NIL)
771                                 break;
772                         rtables = lnext(rtables);
773                 }
774                 if (rtables == NIL)
775                         elog(ERROR, "find_expr_references_walker: bogus varlevelsup %d",
776                                  var->varlevelsup);
777                 rtable = lfirst(rtables);
778                 if (var->varno <= 0 || var->varno > length(rtable))
779                         elog(ERROR, "find_expr_references_walker: bogus varno %d",
780                                  var->varno);
781                 rte = rt_fetch(var->varno, rtable);
782                 if (rte->rtekind == RTE_RELATION)
783                 {
784                         /* If it's a plain relation, reference this column */
785                         /* NB: this code works for whole-row Var with attno 0, too */
786                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
787                                                            &context->addrs);
788                 }
789                 else if (rte->rtekind == RTE_JOIN)
790                 {
791                         /* Scan join output column to add references to join inputs */
792                         List   *save_rtables;
793
794                         /* We must make the context appropriate for join's level */
795                         save_rtables = context->rtables;
796                         context->rtables = rtables;
797                         if (var->varattno <= 0 ||
798                                 var->varattno > length(rte->joinaliasvars))
799                                 elog(ERROR, "find_expr_references_walker: bogus varattno %d",
800                                          var->varattno);
801                         find_expr_references_walker((Node *) nth(var->varattno - 1,
802                                                                                                          rte->joinaliasvars),
803                                                                                 context);
804                         context->rtables = save_rtables;
805                 }
806                 return false;
807         }
808         if (IsA(node, Expr))
809         {
810                 Expr       *expr = (Expr *) node;
811
812                 if (expr->opType == OP_EXPR ||
813                         expr->opType == DISTINCT_EXPR)
814                 {
815                         Oper       *oper = (Oper *) expr->oper;
816
817                         add_object_address(OCLASS_OPERATOR, oper->opno, 0,
818                                                            &context->addrs);
819                 }
820                 else if (expr->opType == FUNC_EXPR)
821                 {
822                         Func       *func = (Func *) expr->oper;
823
824                         add_object_address(OCLASS_PROC, func->funcid, 0,
825                                                            &context->addrs);
826                 }
827                 /* fall through to examine arguments */
828         }
829         if (IsA(node, Aggref))
830         {
831                 Aggref     *aggref = (Aggref *) node;
832
833                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
834                                                    &context->addrs);
835                 /* fall through to examine arguments */
836         }
837         if (is_subplan(node))
838         {
839                 /* Extra work needed here if we ever need this case */
840                 elog(ERROR, "find_expr_references_walker: already-planned subqueries not supported");
841         }
842         if (IsA(node, Query))
843         {
844                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
845                 Query      *query = (Query *) node;
846                 List       *rtable;
847                 bool            result;
848
849                 /*
850                  * Add whole-relation refs for each plain relation mentioned in
851                  * the subquery's rtable.  (Note: query_tree_walker takes care of
852                  * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need
853                  * to do that here.  But keep it from looking at join alias lists.)
854                  */
855                 foreach(rtable, query->rtable)
856                 {
857                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
858
859                         if (rte->rtekind == RTE_RELATION)
860                                 add_object_address(OCLASS_CLASS, rte->relid, 0,
861                                                                    &context->addrs);
862                 }
863
864                 /* Examine substructure of query */
865                 context->rtables = lcons(query->rtable, context->rtables);
866                 result = query_tree_walker(query,
867                                                                    find_expr_references_walker,
868                                                                    (void *) context,
869                                                                    QTW_IGNORE_JOINALIASES);
870                 context->rtables = lnext(context->rtables);
871                 return result;
872         }
873         return expression_tree_walker(node, find_expr_references_walker,
874                                                                   (void *) context);
875 }
876
877 /*
878  * Given an array of dependency references, eliminate any duplicates.
879  */
880 static void
881 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
882 {
883         ObjectAddress *priorobj;
884         int                     oldref,
885                                 newrefs;
886
887         if (addrs->numrefs <= 1)
888                 return;                                 /* nothing to do */
889
890         /* Sort the refs so that duplicates are adjacent */
891         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
892                   object_address_comparator);
893
894         /* Remove dups */
895         priorobj = addrs->refs;
896         newrefs = 1;
897         for (oldref = 1; oldref < addrs->numrefs; oldref++)
898         {
899                 ObjectAddress *thisobj = addrs->refs + oldref;
900
901                 if (priorobj->classId == thisobj->classId &&
902                         priorobj->objectId == thisobj->objectId)
903                 {
904                         if (priorobj->objectSubId == thisobj->objectSubId)
905                                 continue;               /* identical, so drop thisobj */
906
907                         /*
908                          * If we have a whole-object reference and a reference to a
909                          * part of the same object, we don't need the whole-object
910                          * reference (for example, we don't need to reference both
911                          * table foo and column foo.bar).  The whole-object reference
912                          * will always appear first in the sorted list.
913                          */
914                         if (priorobj->objectSubId == 0)
915                         {
916                                 /* replace whole ref with partial */
917                                 priorobj->objectSubId = thisobj->objectSubId;
918                                 continue;
919                         }
920                 }
921                 /* Not identical, so add thisobj to output set */
922                 priorobj++;
923                 priorobj->classId = thisobj->classId;
924                 priorobj->objectId = thisobj->objectId;
925                 priorobj->objectSubId = thisobj->objectSubId;
926                 newrefs++;
927         }
928
929         addrs->numrefs = newrefs;
930 }
931
932 /*
933  * qsort comparator for ObjectAddress items
934  */
935 static int
936 object_address_comparator(const void *a, const void *b)
937 {
938         const ObjectAddress *obja = (const ObjectAddress *) a;
939         const ObjectAddress *objb = (const ObjectAddress *) b;
940
941         if (obja->classId < objb->classId)
942                 return -1;
943         if (obja->classId > objb->classId)
944                 return 1;
945         if (obja->objectId < objb->objectId)
946                 return -1;
947         if (obja->objectId > objb->objectId)
948                 return 1;
949
950         /*
951          * We sort the subId as an unsigned int so that 0 will come first. See
952          * logic in eliminate_duplicate_dependencies.
953          */
954         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
955                 return -1;
956         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
957                 return 1;
958         return 0;
959 }
960
961 /*
962  * Routines for handling an expansible array of ObjectAddress items.
963  *
964  * init_object_addresses: initialize an ObjectAddresses array.
965  */
966 static void
967 init_object_addresses(ObjectAddresses *addrs)
968 {
969         /* Initialize array to empty */
970         addrs->numrefs = 0;
971         addrs->maxrefs = 32;            /* arbitrary initial array size */
972         addrs->refs = (ObjectAddress *)
973                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
974
975         /* Initialize object_classes[] if not done yet */
976         /* This will be needed by add_object_address() */
977         if (!object_classes_initialized)
978                 init_object_classes();
979 }
980
981 /*
982  * Add an entry to an ObjectAddresses array.
983  *
984  * It is convenient to specify the class by ObjectClass rather than directly
985  * by catalog OID.
986  */
987 static void
988 add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
989                                    ObjectAddresses *addrs)
990 {
991         ObjectAddress *item;
992
993         /* enlarge array if needed */
994         if (addrs->numrefs >= addrs->maxrefs)
995         {
996                 addrs->maxrefs *= 2;
997                 addrs->refs = (ObjectAddress *)
998                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
999         }
1000         /* record this item */
1001         item = addrs->refs + addrs->numrefs;
1002         item->classId = object_classes[oclass];
1003         item->objectId = objectId;
1004         item->objectSubId = subId;
1005         addrs->numrefs++;
1006 }
1007
1008 /*
1009  * Add an entry to an ObjectAddresses array.
1010  *
1011  * As above, but specify entry exactly.
1012  */
1013 static void
1014 add_exact_object_address(const ObjectAddress *object,
1015                                                  ObjectAddresses *addrs)
1016 {
1017         ObjectAddress *item;
1018
1019         /* enlarge array if needed */
1020         if (addrs->numrefs >= addrs->maxrefs)
1021         {
1022                 addrs->maxrefs *= 2;
1023                 addrs->refs = (ObjectAddress *)
1024                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1025         }
1026         /* record this item */
1027         item = addrs->refs + addrs->numrefs;
1028         *item = *object;
1029         addrs->numrefs++;
1030 }
1031
1032 /*
1033  * Test whether an object is present in an ObjectAddresses array.
1034  *
1035  * We return "true" if object is a subobject of something in the array, too.
1036  */
1037 static bool
1038 object_address_present(const ObjectAddress *object,
1039                                            ObjectAddresses *addrs)
1040 {
1041         int                     i;
1042
1043         for (i = addrs->numrefs - 1; i >= 0; i--)
1044         {
1045                 ObjectAddress *thisobj = addrs->refs + i;
1046
1047                 if (object->classId == thisobj->classId &&
1048                         object->objectId == thisobj->objectId)
1049                 {
1050                         if (object->objectSubId == thisobj->objectSubId ||
1051                                 thisobj->objectSubId == 0)
1052                                 return true;
1053                 }
1054         }
1055
1056         return false;
1057 }
1058
1059 /*
1060  * Clean up when done with an ObjectAddresses array.
1061  */
1062 static void
1063 term_object_addresses(ObjectAddresses *addrs)
1064 {
1065         pfree(addrs->refs);
1066 }
1067
1068 /*
1069  * Initialize the object_classes[] table.
1070  *
1071  * Although some of these OIDs aren't compile-time constants, they surely
1072  * shouldn't change during a backend's run.  So, we look them up the
1073  * first time through and then cache them.
1074  */
1075 static void
1076 init_object_classes(void)
1077 {
1078         object_classes[OCLASS_CLASS] = RelOid_pg_class;
1079         object_classes[OCLASS_PROC] = RelOid_pg_proc;
1080         object_classes[OCLASS_TYPE] = RelOid_pg_type;
1081         object_classes[OCLASS_CAST] = get_system_catalog_relid(CastRelationName);
1082         object_classes[OCLASS_CONSTRAINT] = get_system_catalog_relid(ConstraintRelationName);
1083         object_classes[OCLASS_CONVERSION] = get_system_catalog_relid(ConversionRelationName);
1084         object_classes[OCLASS_DEFAULT] = get_system_catalog_relid(AttrDefaultRelationName);
1085         object_classes[OCLASS_LANGUAGE] = get_system_catalog_relid(LanguageRelationName);
1086         object_classes[OCLASS_OPERATOR] = get_system_catalog_relid(OperatorRelationName);
1087         object_classes[OCLASS_OPCLASS] = get_system_catalog_relid(OperatorClassRelationName);
1088         object_classes[OCLASS_REWRITE] = get_system_catalog_relid(RewriteRelationName);
1089         object_classes[OCLASS_TRIGGER] = get_system_catalog_relid(TriggerRelationName);
1090         object_classes[OCLASS_SCHEMA] = get_system_catalog_relid(NamespaceRelationName);
1091         object_classes_initialized = true;
1092 }
1093
1094 /*
1095  * Determine the class of a given object identified by objectAddress.
1096  *
1097  * This function is needed just because some of the system catalogs do
1098  * not have hardwired-at-compile-time OIDs.
1099  */
1100 static ObjectClasses
1101 getObjectClass(const ObjectAddress *object)
1102 {
1103         /* Easy for the bootstrapped catalogs... */
1104         switch (object->classId)
1105         {
1106                 case RelOid_pg_class:
1107                         /* caller must check objectSubId */
1108                         return OCLASS_CLASS;
1109
1110                 case RelOid_pg_proc:
1111                         Assert(object->objectSubId == 0);
1112                         return OCLASS_PROC;
1113
1114                 case RelOid_pg_type:
1115                         Assert(object->objectSubId == 0);
1116                         return OCLASS_TYPE;
1117         }
1118
1119         /*
1120          * Handle cases where catalog's OID is not hardwired.
1121          */
1122         if (!object_classes_initialized)
1123                 init_object_classes();
1124
1125         if (object->classId == object_classes[OCLASS_CAST])
1126         {
1127                 Assert(object->objectSubId == 0);
1128                 return OCLASS_CAST;
1129         }
1130         if (object->classId == object_classes[OCLASS_CONSTRAINT])
1131         {
1132                 Assert(object->objectSubId == 0);
1133                 return OCLASS_CONSTRAINT;
1134         }
1135         if (object->classId == object_classes[OCLASS_CONVERSION])
1136         {
1137                 Assert(object->objectSubId == 0);
1138                 return OCLASS_CONVERSION;
1139         }
1140         if (object->classId == object_classes[OCLASS_DEFAULT])
1141         {
1142                 Assert(object->objectSubId == 0);
1143                 return OCLASS_DEFAULT;
1144         }
1145         if (object->classId == object_classes[OCLASS_LANGUAGE])
1146         {
1147                 Assert(object->objectSubId == 0);
1148                 return OCLASS_LANGUAGE;
1149         }
1150         if (object->classId == object_classes[OCLASS_OPERATOR])
1151         {
1152                 Assert(object->objectSubId == 0);
1153                 return OCLASS_OPERATOR;
1154         }
1155         if (object->classId == object_classes[OCLASS_OPCLASS])
1156         {
1157                 Assert(object->objectSubId == 0);
1158                 return OCLASS_OPCLASS;
1159         }
1160         if (object->classId == object_classes[OCLASS_REWRITE])
1161         {
1162                 Assert(object->objectSubId == 0);
1163                 return OCLASS_REWRITE;
1164         }
1165         if (object->classId == object_classes[OCLASS_TRIGGER])
1166         {
1167                 Assert(object->objectSubId == 0);
1168                 return OCLASS_TRIGGER;
1169         }
1170         if (object->classId == object_classes[OCLASS_SCHEMA])
1171         {
1172                 Assert(object->objectSubId == 0);
1173                 return OCLASS_SCHEMA;
1174         }
1175
1176         elog(ERROR, "getObjectClass: Unknown object class %u",
1177                  object->classId);
1178         return OCLASS_CLASS;            /* keep compiler quiet */
1179 }
1180
1181 /*
1182  * getObjectDescription: build an object description for messages
1183  *
1184  * The result is a palloc'd string.
1185  */
1186 static char *
1187 getObjectDescription(const ObjectAddress *object)
1188 {
1189         StringInfoData buffer;
1190
1191         initStringInfo(&buffer);
1192
1193         switch (getObjectClass(object))
1194         {
1195                 case OCLASS_CLASS:
1196                         getRelationDescription(&buffer, object->objectId);
1197                         if (object->objectSubId != 0)
1198                                 appendStringInfo(&buffer, " column %s",
1199                                                                  get_attname(object->objectId,
1200                                                                                          object->objectSubId));
1201                         break;
1202
1203                 case OCLASS_PROC:
1204                         appendStringInfo(&buffer, "function %s",
1205                                                          format_procedure(object->objectId));
1206                         break;
1207
1208                 case OCLASS_TYPE:
1209                         appendStringInfo(&buffer, "type %s",
1210                                                          format_type_be(object->objectId));
1211                         break;
1212
1213                 case OCLASS_CAST:
1214                         {
1215                                 Relation        castDesc;
1216                                 ScanKeyData skey[1];
1217                                 SysScanDesc rcscan;
1218                                 HeapTuple       tup;
1219                                 Form_pg_cast castForm;
1220
1221                                 castDesc = heap_openr(CastRelationName, AccessShareLock);
1222
1223                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1224                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1225                                                                          ObjectIdGetDatum(object->objectId));
1226
1227                                 rcscan = systable_beginscan(castDesc, CastOidIndex, true,
1228                                                                                         SnapshotNow, 1, skey);
1229
1230                                 tup = systable_getnext(rcscan);
1231
1232                                 if (!HeapTupleIsValid(tup))
1233                                         elog(ERROR, "getObjectDescription: Cast %u does not exist",
1234                                                  object->objectId);
1235
1236                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
1237
1238                                 appendStringInfo(&buffer, "cast from %s to %s",
1239                                                                  format_type_be(castForm->castsource),
1240                                                                  format_type_be(castForm->casttarget));
1241
1242                                 systable_endscan(rcscan);
1243                                 heap_close(castDesc, AccessShareLock);
1244                                 break;
1245                         }
1246
1247                 case OCLASS_CONSTRAINT:
1248                         {
1249                                 Relation        conDesc;
1250                                 ScanKeyData skey[1];
1251                                 SysScanDesc rcscan;
1252                                 HeapTuple       tup;
1253                                 Form_pg_constraint con;
1254
1255                                 conDesc = heap_openr(ConstraintRelationName, AccessShareLock);
1256
1257                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1258                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1259                                                                          ObjectIdGetDatum(object->objectId));
1260
1261                                 rcscan = systable_beginscan(conDesc, ConstraintOidIndex, true,
1262                                                                                         SnapshotNow, 1, skey);
1263
1264                                 tup = systable_getnext(rcscan);
1265
1266                                 if (!HeapTupleIsValid(tup))
1267                                         elog(ERROR, "getObjectDescription: Constraint %u does not exist",
1268                                                  object->objectId);
1269
1270                                 con = (Form_pg_constraint) GETSTRUCT(tup);
1271
1272                                 if (OidIsValid(con->conrelid))
1273                                 {
1274                                         appendStringInfo(&buffer, "constraint %s on ",
1275                                                                          NameStr(con->conname));
1276                                         getRelationDescription(&buffer, con->conrelid);
1277                                 }
1278                                 else
1279                                 {
1280                                         appendStringInfo(&buffer, "constraint %s",
1281                                                                          NameStr(con->conname));
1282                                 }
1283
1284                                 systable_endscan(rcscan);
1285                                 heap_close(conDesc, AccessShareLock);
1286                                 break;
1287                         }
1288
1289                 case OCLASS_CONVERSION:
1290                         {
1291                                 HeapTuple       conTup;
1292
1293                                 conTup = SearchSysCache(CONOID,
1294                                                                           ObjectIdGetDatum(object->objectId),
1295                                                                                 0, 0, 0);
1296                                 if (!HeapTupleIsValid(conTup))
1297                                         elog(ERROR, "getObjectDescription: Conversion %u does not exist",
1298                                                  object->objectId);
1299                                 appendStringInfo(&buffer, "conversion %s",
1300                                                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
1301                                 ReleaseSysCache(conTup);
1302                                 break;
1303                         }
1304
1305                 case OCLASS_DEFAULT:
1306                         {
1307                                 Relation        attrdefDesc;
1308                                 ScanKeyData skey[1];
1309                                 SysScanDesc adscan;
1310                                 HeapTuple       tup;
1311                                 Form_pg_attrdef attrdef;
1312                                 ObjectAddress colobject;
1313
1314                                 attrdefDesc = heap_openr(AttrDefaultRelationName, AccessShareLock);
1315
1316                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1317                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1318                                                                          ObjectIdGetDatum(object->objectId));
1319
1320                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndex, true,
1321                                                                                         SnapshotNow, 1, skey);
1322
1323                                 tup = systable_getnext(adscan);
1324
1325                                 if (!HeapTupleIsValid(tup))
1326                                         elog(ERROR, "getObjectDescription: Default %u does not exist",
1327                                                  object->objectId);
1328
1329                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
1330
1331                                 colobject.classId = RelOid_pg_class;
1332                                 colobject.objectId = attrdef->adrelid;
1333                                 colobject.objectSubId = attrdef->adnum;
1334
1335                                 appendStringInfo(&buffer, "default for %s",
1336                                                                  getObjectDescription(&colobject));
1337
1338                                 systable_endscan(adscan);
1339                                 heap_close(attrdefDesc, AccessShareLock);
1340                                 break;
1341                         }
1342
1343                 case OCLASS_LANGUAGE:
1344                         {
1345                                 HeapTuple       langTup;
1346
1347                                 langTup = SearchSysCache(LANGOID,
1348                                                                           ObjectIdGetDatum(object->objectId),
1349                                                                                  0, 0, 0);
1350                                 if (!HeapTupleIsValid(langTup))
1351                                         elog(ERROR, "getObjectDescription: Language %u does not exist",
1352                                                  object->objectId);
1353                                 appendStringInfo(&buffer, "language %s",
1354                                                                  NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
1355                                 ReleaseSysCache(langTup);
1356                                 break;
1357                         }
1358
1359                 case OCLASS_OPERATOR:
1360                         appendStringInfo(&buffer, "operator %s",
1361                                                          format_operator(object->objectId));
1362                         break;
1363
1364                 case OCLASS_OPCLASS:
1365                         {
1366                                 HeapTuple       opcTup;
1367                                 Form_pg_opclass opcForm;
1368                                 HeapTuple       amTup;
1369                                 Form_pg_am      amForm;
1370                                 char       *nspname;
1371
1372                                 opcTup = SearchSysCache(CLAOID,
1373                                                                           ObjectIdGetDatum(object->objectId),
1374                                                                                 0, 0, 0);
1375                                 if (!HeapTupleIsValid(opcTup))
1376                                         elog(ERROR, "cache lookup of opclass %u failed",
1377                                                  object->objectId);
1378                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
1379
1380                                 /* Qualify the name if not visible in search path */
1381                                 if (OpclassIsVisible(object->objectId))
1382                                         nspname = NULL;
1383                                 else
1384                                         nspname = get_namespace_name(opcForm->opcnamespace);
1385
1386                                 appendStringInfo(&buffer, "operator class %s",
1387                                                                  quote_qualified_identifier(nspname,
1388                                                                                          NameStr(opcForm->opcname)));
1389
1390                                 amTup = SearchSysCache(AMOID,
1391                                                                            ObjectIdGetDatum(opcForm->opcamid),
1392                                                                            0, 0, 0);
1393                                 if (!HeapTupleIsValid(amTup))
1394                                         elog(ERROR, "syscache lookup for AM %u failed",
1395                                                  opcForm->opcamid);
1396                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
1397
1398                                 appendStringInfo(&buffer, " for %s",
1399                                                                  NameStr(amForm->amname));
1400
1401                                 ReleaseSysCache(amTup);
1402                                 ReleaseSysCache(opcTup);
1403                                 break;
1404                         }
1405
1406                 case OCLASS_REWRITE:
1407                         {
1408                                 Relation        ruleDesc;
1409                                 ScanKeyData skey[1];
1410                                 SysScanDesc rcscan;
1411                                 HeapTuple       tup;
1412                                 Form_pg_rewrite rule;
1413
1414                                 ruleDesc = heap_openr(RewriteRelationName, AccessShareLock);
1415
1416                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1417                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1418                                                                          ObjectIdGetDatum(object->objectId));
1419
1420                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndex, true,
1421                                                                                         SnapshotNow, 1, skey);
1422
1423                                 tup = systable_getnext(rcscan);
1424
1425                                 if (!HeapTupleIsValid(tup))
1426                                         elog(ERROR, "getObjectDescription: Rule %u does not exist",
1427                                                  object->objectId);
1428
1429                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
1430
1431                                 appendStringInfo(&buffer, "rule %s on ",
1432                                                                  NameStr(rule->rulename));
1433                                 getRelationDescription(&buffer, rule->ev_class);
1434
1435                                 systable_endscan(rcscan);
1436                                 heap_close(ruleDesc, AccessShareLock);
1437                                 break;
1438                         }
1439
1440                 case OCLASS_TRIGGER:
1441                         {
1442                                 Relation        trigDesc;
1443                                 ScanKeyData skey[1];
1444                                 SysScanDesc tgscan;
1445                                 HeapTuple       tup;
1446                                 Form_pg_trigger trig;
1447
1448                                 trigDesc = heap_openr(TriggerRelationName, AccessShareLock);
1449
1450                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1451                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1452                                                                          ObjectIdGetDatum(object->objectId));
1453
1454                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndex, true,
1455                                                                                         SnapshotNow, 1, skey);
1456
1457                                 tup = systable_getnext(tgscan);
1458
1459                                 if (!HeapTupleIsValid(tup))
1460                                         elog(ERROR, "getObjectDescription: Trigger %u does not exist",
1461                                                  object->objectId);
1462
1463                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
1464
1465                                 appendStringInfo(&buffer, "trigger %s on ",
1466                                                                  NameStr(trig->tgname));
1467                                 getRelationDescription(&buffer, trig->tgrelid);
1468
1469                                 systable_endscan(tgscan);
1470                                 heap_close(trigDesc, AccessShareLock);
1471                                 break;
1472                         }
1473
1474                 case OCLASS_SCHEMA:
1475                         {
1476                                 char       *nspname;
1477
1478                                 nspname = get_namespace_name(object->objectId);
1479                                 if (!nspname)
1480                                         elog(ERROR, "getObjectDescription: Schema %u does not exist",
1481                                                  object->objectId);
1482                                 appendStringInfo(&buffer, "schema %s", nspname);
1483                                 break;
1484                         }
1485
1486                 default:
1487                         appendStringInfo(&buffer, "unknown object %u %u %d",
1488                                                          object->classId,
1489                                                          object->objectId,
1490                                                          object->objectSubId);
1491                         break;
1492         }
1493
1494         return buffer.data;
1495 }
1496
1497 /*
1498  * subroutine for getObjectDescription: describe a relation
1499  */
1500 static void
1501 getRelationDescription(StringInfo buffer, Oid relid)
1502 {
1503         HeapTuple       relTup;
1504         Form_pg_class relForm;
1505         char       *nspname;
1506         char       *relname;
1507
1508         relTup = SearchSysCache(RELOID,
1509                                                         ObjectIdGetDatum(relid),
1510                                                         0, 0, 0);
1511         if (!HeapTupleIsValid(relTup))
1512                 elog(ERROR, "cache lookup of relation %u failed", relid);
1513         relForm = (Form_pg_class) GETSTRUCT(relTup);
1514
1515         /* Qualify the name if not visible in search path */
1516         if (RelationIsVisible(relid))
1517                 nspname = NULL;
1518         else
1519                 nspname = get_namespace_name(relForm->relnamespace);
1520
1521         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
1522
1523         switch (relForm->relkind)
1524         {
1525                 case RELKIND_RELATION:
1526                         appendStringInfo(buffer, "table %s",
1527                                                          relname);
1528                         break;
1529                 case RELKIND_INDEX:
1530                         appendStringInfo(buffer, "index %s",
1531                                                          relname);
1532                         break;
1533                 case RELKIND_SPECIAL:
1534                         appendStringInfo(buffer, "special system relation %s",
1535                                                          relname);
1536                         break;
1537                 case RELKIND_SEQUENCE:
1538                         appendStringInfo(buffer, "sequence %s",
1539                                                          relname);
1540                         break;
1541                 case RELKIND_UNCATALOGED:
1542                         appendStringInfo(buffer, "uncataloged table %s",
1543                                                          relname);
1544                         break;
1545                 case RELKIND_TOASTVALUE:
1546                         appendStringInfo(buffer, "toast table %s",
1547                                                          relname);
1548                         break;
1549                 case RELKIND_VIEW:
1550                         appendStringInfo(buffer, "view %s",
1551                                                          relname);
1552                         break;
1553                 case RELKIND_COMPOSITE_TYPE:
1554                         appendStringInfo(buffer, "composite type %s",
1555                                                          relname);
1556                         break;
1557                 default:
1558                         /* shouldn't get here */
1559                         appendStringInfo(buffer, "relation %s",
1560                                                          relname);
1561                         break;
1562         }
1563
1564         ReleaseSysCache(relTup);
1565 }