]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
Move most of the error checking for foreign-key constraints out of
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.12 2002/09/22 00:37:09 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/catname.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_attrdef.h"
26 #include "catalog/pg_cast.h"
27 #include "catalog/pg_constraint.h"
28 #include "catalog/pg_conversion.h"
29 #include "catalog/pg_depend.h"
30 #include "catalog/pg_language.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_rewrite.h"
33 #include "catalog/pg_trigger.h"
34 #include "commands/comment.h"
35 #include "commands/defrem.h"
36 #include "commands/proclang.h"
37 #include "commands/schemacmds.h"
38 #include "commands/trigger.h"
39 #include "lib/stringinfo.h"
40 #include "miscadmin.h"
41 #include "optimizer/clauses.h"
42 #include "parser/parsetree.h"
43 #include "rewrite/rewriteRemove.h"
44 #include "utils/builtins.h"
45 #include "utils/fmgroids.h"
46 #include "utils/lsyscache.h"
47 #include "utils/syscache.h"
48
49
50 /* This enum covers all system catalogs whose OIDs can appear in classid. */
51 typedef enum ObjectClasses
52 {
53         OCLASS_CLASS,                           /* pg_class */
54         OCLASS_PROC,                            /* pg_proc */
55         OCLASS_TYPE,                            /* pg_type */
56         OCLASS_CAST,                            /* pg_cast */
57         OCLASS_CONSTRAINT,                      /* pg_constraint */
58         OCLASS_CONVERSION,                      /* pg_conversion */
59         OCLASS_DEFAULT,                         /* pg_attrdef */
60         OCLASS_LANGUAGE,                        /* pg_language */
61         OCLASS_OPERATOR,                        /* pg_operator */
62         OCLASS_OPCLASS,                         /* pg_opclass */
63         OCLASS_REWRITE,                         /* pg_rewrite */
64         OCLASS_TRIGGER,                         /* pg_trigger */
65         OCLASS_SCHEMA,                          /* pg_namespace */
66         MAX_OCLASS                                      /* MUST BE LAST */
67 } ObjectClasses;
68
69 /* expansible list of ObjectAddresses */
70 typedef struct
71 {
72         ObjectAddress *refs;            /* => palloc'd array */
73         int                     numrefs;                /* current number of references */
74         int                     maxrefs;                /* current size of palloc'd array */
75 } ObjectAddresses;
76
77 /* for find_expr_references_walker */
78 typedef struct
79 {
80         ObjectAddresses addrs;          /* addresses being accumulated */
81         List       *rtables;            /* list of rangetables to resolve Vars */
82 } find_expr_references_context;
83
84
85 /*
86  * Because not all system catalogs have predetermined OIDs, we build a table
87  * mapping between ObjectClasses and OIDs.      This is done at most once per
88  * backend run, to minimize lookup overhead.
89  */
90 static bool object_classes_initialized = false;
91 static Oid      object_classes[MAX_OCLASS];
92
93
94 static void findAutoDeletableObjects(const ObjectAddress *object,
95                                                                          ObjectAddresses *oktodelete,
96                                                                          Relation depRel);
97 static bool recursiveDeletion(const ObjectAddress *object,
98                                   DropBehavior behavior,
99                                   const ObjectAddress *callingObject,
100                                   ObjectAddresses *oktodelete,
101                                   Relation depRel);
102 static void doDeletion(const ObjectAddress *object);
103 static bool find_expr_references_walker(Node *node,
104                                                         find_expr_references_context *context);
105 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
106 static int      object_address_comparator(const void *a, const void *b);
107 static void init_object_addresses(ObjectAddresses *addrs);
108 static void add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
109                                    ObjectAddresses *addrs);
110 static void add_exact_object_address(const ObjectAddress *object,
111                                                  ObjectAddresses *addrs);
112 static bool object_address_present(const ObjectAddress *object,
113                                    ObjectAddresses *addrs);
114 static void term_object_addresses(ObjectAddresses *addrs);
115 static void init_object_classes(void);
116 static ObjectClasses getObjectClass(const ObjectAddress *object);
117 static char *getObjectDescription(const ObjectAddress *object);
118 static void getRelationDescription(StringInfo buffer, Oid relid);
119
120
121 /*
122  * performDeletion: attempt to drop the specified object.  If CASCADE
123  * behavior is specified, also drop any dependent objects (recursively).
124  * If RESTRICT behavior is specified, error out if there are any dependent
125  * objects, except for those that should be implicitly dropped anyway
126  * according to the dependency type.
127  *
128  * This is the outer control routine for all forms of DROP that drop objects
129  * that can participate in dependencies.
130  */
131 void
132 performDeletion(const ObjectAddress *object,
133                                 DropBehavior behavior)
134 {
135         char       *objDescription;
136         Relation        depRel;
137         ObjectAddresses oktodelete;
138
139         /*
140          * Get object description for possible use in failure message. Must do
141          * this before deleting it ...
142          */
143         objDescription = getObjectDescription(object);
144
145         /*
146          * We save some cycles by opening pg_depend just once and passing the
147          * Relation pointer down to all the recursive deletion steps.
148          */
149         depRel = heap_openr(DependRelationName, RowExclusiveLock);
150
151         /*
152          * Construct a list of objects that are reachable by AUTO or INTERNAL
153          * dependencies from the target object.  These should be deleted silently,
154          * even if the actual deletion pass first reaches one of them via a
155          * non-auto dependency.
156          */
157         init_object_addresses(&oktodelete);
158
159         findAutoDeletableObjects(object, &oktodelete, depRel);
160
161         if (!recursiveDeletion(object, behavior, NULL, &oktodelete, depRel))
162                 elog(ERROR, "Cannot drop %s because other objects depend on it"
163                          "\n\tUse DROP ... CASCADE to drop the dependent objects too",
164                          objDescription);
165
166         term_object_addresses(&oktodelete);
167
168         heap_close(depRel, RowExclusiveLock);
169
170         pfree(objDescription);
171 }
172
173
174 /*
175  * findAutoDeletableObjects: find all objects that are reachable by AUTO or
176  * INTERNAL dependency paths from the given object.  Add them all to the
177  * oktodelete list.  Note that the originally given object will also be
178  * added to the list.
179  *
180  * depRel is the already-open pg_depend relation.
181  */
182 static void
183 findAutoDeletableObjects(const ObjectAddress *object,
184                                                  ObjectAddresses *oktodelete,
185                                                  Relation depRel)
186 {
187         ScanKeyData key[3];
188         int                     nkeys;
189         SysScanDesc scan;
190         HeapTuple       tup;
191         ObjectAddress otherObject;
192
193         /*
194          * If this object is already in oktodelete, then we already visited it;
195          * don't do so again (this prevents infinite recursion if there's a loop
196          * in pg_depend).  Otherwise, add it.
197          */
198         if (object_address_present(object, oktodelete))
199                 return;
200         add_exact_object_address(object, oktodelete);
201
202         /*
203          * Scan pg_depend records that link to this object, showing the things
204          * that depend on it.  For each one that is AUTO or INTERNAL, visit the
205          * referencing object.
206          *
207          * When dropping a whole object (subId = 0), find pg_depend records for
208          * its sub-objects too.
209          */
210         ScanKeyEntryInitialize(&key[0], 0x0,
211                                                    Anum_pg_depend_refclassid, F_OIDEQ,
212                                                    ObjectIdGetDatum(object->classId));
213         ScanKeyEntryInitialize(&key[1], 0x0,
214                                                    Anum_pg_depend_refobjid, F_OIDEQ,
215                                                    ObjectIdGetDatum(object->objectId));
216         if (object->objectSubId != 0)
217         {
218                 ScanKeyEntryInitialize(&key[2], 0x0,
219                                                            Anum_pg_depend_refobjsubid, F_INT4EQ,
220                                                            Int32GetDatum(object->objectSubId));
221                 nkeys = 3;
222         }
223         else
224                 nkeys = 2;
225
226         scan = systable_beginscan(depRel, DependReferenceIndex, true,
227                                                           SnapshotNow, nkeys, key);
228
229         while (HeapTupleIsValid(tup = systable_getnext(scan)))
230         {
231                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
232
233                 switch (foundDep->deptype)
234                 {
235                         case DEPENDENCY_NORMAL:
236                                 /* ignore */
237                                 break;
238                         case DEPENDENCY_AUTO:
239                         case DEPENDENCY_INTERNAL:
240                                 /* recurse */
241                                 otherObject.classId = foundDep->classid;
242                                 otherObject.objectId = foundDep->objid;
243                                 otherObject.objectSubId = foundDep->objsubid;
244                                 findAutoDeletableObjects(&otherObject, oktodelete, depRel);
245                                 break;
246                         case DEPENDENCY_PIN:
247                                 /*
248                                  * For a PIN dependency we just elog immediately; there
249                                  * won't be any others to examine, and we aren't ever
250                                  * going to let the user delete it.
251                                  */
252                                 elog(ERROR, "Cannot drop %s because it is required by the database system",
253                                          getObjectDescription(object));
254                                 break;
255                         default:
256                                 elog(ERROR, "findAutoDeletableObjects: unknown dependency type '%c' for %s",
257                                          foundDep->deptype, getObjectDescription(object));
258                                 break;
259                 }
260         }
261
262         systable_endscan(scan);
263 }
264
265
266 /*
267  * recursiveDeletion: delete a single object for performDeletion, plus
268  * (recursively) anything that depends on it.
269  *
270  * Returns TRUE if successful, FALSE if not.
271  *
272  * callingObject is NULL at the outer level, else identifies the object that
273  * we recursed from (the reference object that someone else needs to delete).
274  *
275  * oktodelete is a list of objects verified deletable (ie, reachable by one
276  * or more AUTO or INTERNAL dependencies from the original target).
277  *
278  * depRel is the already-open pg_depend relation.
279  *
280  *
281  * In RESTRICT mode, we perform all the deletions anyway, but elog a NOTICE
282  * and return FALSE if we find a restriction violation.  performDeletion
283  * will then abort the transaction to nullify the deletions.  We have to
284  * do it this way to (a) report all the direct and indirect dependencies
285  * while (b) not going into infinite recursion if there's a cycle.
286  *
287  * This is even more complex than one could wish, because it is possible for
288  * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL
289  * dependencies.  Also, we might have a situation where we've been asked to
290  * delete object A, and objects B and C both have AUTO dependencies on A,
291  * but B also has a NORMAL dependency on C.  (Since any of these paths might
292  * be indirect, we can't prevent these scenarios, but must cope instead.)
293  * If we visit C before B then we would mistakenly decide that the B->C link
294  * should prevent the restricted drop from occurring.  To handle this, we make
295  * a pre-scan to find all the objects that are auto-deletable from A.  If we
296  * visit C first, but B is present in the oktodelete list, then we make no
297  * complaint but recurse to delete B anyway.  (Note that in general we must
298  * delete B before deleting C; the drop routine for B may try to access C.)
299  *
300  * Note: in the case where the path to B is traversed first, we will not
301  * see the NORMAL dependency when we reach C, because of the pg_depend
302  * removals done in step 1.  The oktodelete list is necessary just
303  * to make the behavior independent of the order in which pg_depend
304  * entries are visited.
305  */
306 static bool
307 recursiveDeletion(const ObjectAddress *object,
308                                   DropBehavior behavior,
309                                   const ObjectAddress *callingObject,
310                                   ObjectAddresses *oktodelete,
311                                   Relation depRel)
312 {
313         bool            ok = true;
314         char       *objDescription;
315         ScanKeyData key[3];
316         int                     nkeys;
317         SysScanDesc scan;
318         HeapTuple       tup;
319         ObjectAddress otherObject;
320         ObjectAddress owningObject;
321         bool            amOwned = false;
322
323         /*
324          * Get object description for possible use in messages.  Must do this
325          * before deleting it ...
326          */
327         objDescription = getObjectDescription(object);
328
329         /*
330          * Step 1: find and remove pg_depend records that link from this
331          * object to others.  We have to do this anyway, and doing it first
332          * ensures that we avoid infinite recursion in the case of cycles.
333          * Also, some dependency types require extra processing here.
334          *
335          * When dropping a whole object (subId = 0), remove all pg_depend records
336          * for its sub-objects too.
337          */
338         ScanKeyEntryInitialize(&key[0], 0x0,
339                                                    Anum_pg_depend_classid, F_OIDEQ,
340                                                    ObjectIdGetDatum(object->classId));
341         ScanKeyEntryInitialize(&key[1], 0x0,
342                                                    Anum_pg_depend_objid, F_OIDEQ,
343                                                    ObjectIdGetDatum(object->objectId));
344         if (object->objectSubId != 0)
345         {
346                 ScanKeyEntryInitialize(&key[2], 0x0,
347                                                            Anum_pg_depend_objsubid, F_INT4EQ,
348                                                            Int32GetDatum(object->objectSubId));
349                 nkeys = 3;
350         }
351         else
352                 nkeys = 2;
353
354         scan = systable_beginscan(depRel, DependDependerIndex, true,
355                                                           SnapshotNow, nkeys, key);
356
357         while (HeapTupleIsValid(tup = systable_getnext(scan)))
358         {
359                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
360
361                 otherObject.classId = foundDep->refclassid;
362                 otherObject.objectId = foundDep->refobjid;
363                 otherObject.objectSubId = foundDep->refobjsubid;
364
365                 switch (foundDep->deptype)
366                 {
367                         case DEPENDENCY_NORMAL:
368                         case DEPENDENCY_AUTO:
369                                 /* no problem */
370                                 break;
371                         case DEPENDENCY_INTERNAL:
372
373                                 /*
374                                  * This object is part of the internal implementation of
375                                  * another object.      We have three cases:
376                                  *
377                                  * 1. At the outermost recursion level, disallow the DROP.
378                                  * (We just elog here, rather than proceeding, since no
379                                  * other dependencies are likely to be interesting.)
380                                  */
381                                 if (callingObject == NULL)
382                                 {
383                                         char       *otherObjDesc = getObjectDescription(&otherObject);
384
385                                         elog(ERROR, "Cannot drop %s because %s requires it"
386                                                  "\n\tYou may drop %s instead",
387                                                  objDescription, otherObjDesc, otherObjDesc);
388                                 }
389
390                                 /*
391                                  * 2. When recursing from the other end of this
392                                  * dependency, it's okay to continue with the deletion.
393                                  * This holds when recursing from a whole object that
394                                  * includes the nominal other end as a component, too.
395                                  */
396                                 if (callingObject->classId == otherObject.classId &&
397                                         callingObject->objectId == otherObject.objectId &&
398                                 (callingObject->objectSubId == otherObject.objectSubId ||
399                                  callingObject->objectSubId == 0))
400                                         break;
401
402                                 /*
403                                  * 3. When recursing from anyplace else, transform this
404                                  * deletion request into a delete of the other object.
405                                  * (This will be an error condition iff RESTRICT mode.) In
406                                  * this case we finish deleting my dependencies except for
407                                  * the INTERNAL link, which will be needed to cause the
408                                  * owning object to recurse back to me.
409                                  */
410                                 if (amOwned)    /* shouldn't happen */
411                                         elog(ERROR, "recursiveDeletion: multiple INTERNAL dependencies for %s",
412                                                  objDescription);
413                                 owningObject = otherObject;
414                                 amOwned = true;
415                                 /* "continue" bypasses the simple_heap_delete call below */
416                                 continue;
417                         case DEPENDENCY_PIN:
418
419                                 /*
420                                  * Should not happen; PIN dependencies should have zeroes
421                                  * in the depender fields...
422                                  */
423                                 elog(ERROR, "recursiveDeletion: incorrect use of PIN dependency with %s",
424                                          objDescription);
425                                 break;
426                         default:
427                                 elog(ERROR, "recursiveDeletion: unknown dependency type '%c' for %s",
428                                          foundDep->deptype, objDescription);
429                                 break;
430                 }
431
432                 simple_heap_delete(depRel, &tup->t_self);
433         }
434
435         systable_endscan(scan);
436
437         /*
438          * CommandCounterIncrement here to ensure that preceding changes are
439          * all visible; in particular, that the above deletions of pg_depend
440          * entries are visible.  That prevents infinite recursion in case of a
441          * dependency loop (which is perfectly legal).
442          */
443         CommandCounterIncrement();
444
445         /*
446          * If we found we are owned by another object, ask it to delete itself
447          * instead of proceeding.  Complain if RESTRICT mode, unless the other
448          * object is in oktodelete.
449          */
450         if (amOwned)
451         {
452                 if (object_address_present(&owningObject, oktodelete))
453                         elog(DEBUG1, "Drop auto-cascades to %s",
454                                  getObjectDescription(&owningObject));
455                 else if (behavior == DROP_RESTRICT)
456                 {
457                         elog(NOTICE, "%s depends on %s",
458                                  getObjectDescription(&owningObject),
459                                  objDescription);
460                         ok = false;
461                 }
462                 else
463                         elog(NOTICE, "Drop cascades to %s",
464                                  getObjectDescription(&owningObject));
465
466                 if (!recursiveDeletion(&owningObject, behavior,
467                                                            object,
468                                                            oktodelete, depRel))
469                         ok = false;
470
471                 pfree(objDescription);
472
473                 return ok;
474         }
475
476         /*
477          * Step 2: scan pg_depend records that link to this object, showing
478          * the things that depend on it.  Recursively delete those things. (We
479          * don't delete the pg_depend records here, as the recursive call will
480          * do that.)  Note it's important to delete the dependent objects
481          * before the referenced one, since the deletion routines might do
482          * things like try to update the pg_class record when deleting a check
483          * constraint.
484          *
485          * Again, when dropping a whole object (subId = 0), find pg_depend
486          * records for its sub-objects too.
487          *
488          * NOTE: because we are using SnapshotNow, if a recursive call deletes
489          * any pg_depend tuples that our scan hasn't yet visited, we will not
490          * see them as good when we do visit them.      This is essential for
491          * correct behavior if there are multiple dependency paths between two
492          * objects --- else we might try to delete an already-deleted object.
493          */
494         ScanKeyEntryInitialize(&key[0], 0x0,
495                                                    Anum_pg_depend_refclassid, F_OIDEQ,
496                                                    ObjectIdGetDatum(object->classId));
497         ScanKeyEntryInitialize(&key[1], 0x0,
498                                                    Anum_pg_depend_refobjid, F_OIDEQ,
499                                                    ObjectIdGetDatum(object->objectId));
500         if (object->objectSubId != 0)
501         {
502                 ScanKeyEntryInitialize(&key[2], 0x0,
503                                                            Anum_pg_depend_refobjsubid, F_INT4EQ,
504                                                            Int32GetDatum(object->objectSubId));
505                 nkeys = 3;
506         }
507         else
508                 nkeys = 2;
509
510         scan = systable_beginscan(depRel, DependReferenceIndex, true,
511                                                           SnapshotNow, nkeys, key);
512
513         while (HeapTupleIsValid(tup = systable_getnext(scan)))
514         {
515                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
516
517                 otherObject.classId = foundDep->classid;
518                 otherObject.objectId = foundDep->objid;
519                 otherObject.objectSubId = foundDep->objsubid;
520
521                 switch (foundDep->deptype)
522                 {
523                         case DEPENDENCY_NORMAL:
524                                 /*
525                                  * Perhaps there was another dependency path that would
526                                  * have allowed silent deletion of the otherObject, had
527                                  * we only taken that path first.
528                                  * In that case, act like this link is AUTO, too.
529                                  */
530                                 if (object_address_present(&otherObject, oktodelete))
531                                         elog(DEBUG1, "Drop auto-cascades to %s",
532                                                  getObjectDescription(&otherObject));
533                                 else if (behavior == DROP_RESTRICT)
534                                 {
535                                         elog(NOTICE, "%s depends on %s",
536                                                  getObjectDescription(&otherObject),
537                                                  objDescription);
538                                         ok = false;
539                                 }
540                                 else
541                                         elog(NOTICE, "Drop cascades to %s",
542                                                  getObjectDescription(&otherObject));
543
544                                 if (!recursiveDeletion(&otherObject, behavior,
545                                                                            object,
546                                                                            oktodelete, depRel))
547                                         ok = false;
548                                 break;
549                         case DEPENDENCY_AUTO:
550                         case DEPENDENCY_INTERNAL:
551
552                                 /*
553                                  * We propagate the DROP without complaint even in the
554                                  * RESTRICT case.  (However, normal dependencies on the
555                                  * component object could still cause failure.)
556                                  */
557                                 elog(DEBUG1, "Drop auto-cascades to %s",
558                                          getObjectDescription(&otherObject));
559
560                                 if (!recursiveDeletion(&otherObject, behavior,
561                                                                            object,
562                                                                            oktodelete, depRel))
563                                         ok = false;
564                                 break;
565                         case DEPENDENCY_PIN:
566
567                                 /*
568                                  * For a PIN dependency we just elog immediately; there
569                                  * won't be any others to report.
570                                  */
571                                 elog(ERROR, "Cannot drop %s because it is required by the database system",
572                                          objDescription);
573                                 break;
574                         default:
575                                 elog(ERROR, "recursiveDeletion: unknown dependency type '%c' for %s",
576                                          foundDep->deptype, objDescription);
577                                 break;
578                 }
579         }
580
581         systable_endscan(scan);
582
583         /*
584          * We do not need CommandCounterIncrement here, since if step 2 did
585          * anything then each recursive call will have ended with one.
586          */
587
588         /*
589          * Step 3: delete the object itself.
590          */
591         doDeletion(object);
592
593         /*
594          * Delete any comments associated with this object.  (This is a
595          * convenient place to do it instead of having every object type know
596          * to do it.)
597          */
598         DeleteComments(object->objectId, object->classId, object->objectSubId);
599
600         /*
601          * CommandCounterIncrement here to ensure that preceding changes are
602          * all visible.
603          */
604         CommandCounterIncrement();
605
606         /*
607          * And we're done!
608          */
609         pfree(objDescription);
610
611         return ok;
612 }
613
614
615 /*
616  * doDeletion: actually delete a single object
617  */
618 static void
619 doDeletion(const ObjectAddress *object)
620 {
621         switch (getObjectClass(object))
622         {
623                 case OCLASS_CLASS:
624                         {
625                                 char            relKind = get_rel_relkind(object->objectId);
626
627                                 if (relKind == RELKIND_INDEX)
628                                 {
629                                         Assert(object->objectSubId == 0);
630                                         index_drop(object->objectId);
631                                 }
632                                 else
633                                 {
634                                         if (object->objectSubId != 0)
635                                                 RemoveAttributeById(object->objectId,
636                                                                                         object->objectSubId);
637                                         else
638                                                 heap_drop_with_catalog(object->objectId);
639                                 }
640                                 break;
641                         }
642
643                 case OCLASS_PROC:
644                         RemoveFunctionById(object->objectId);
645                         break;
646
647                 case OCLASS_TYPE:
648                         RemoveTypeById(object->objectId);
649                         break;
650
651                 case OCLASS_CAST:
652                         DropCastById(object->objectId);
653                         break;
654
655                 case OCLASS_CONSTRAINT:
656                         RemoveConstraintById(object->objectId);
657                         break;
658
659                 case OCLASS_CONVERSION:
660                         RemoveConversionById(object->objectId);
661                         break;
662
663                 case OCLASS_DEFAULT:
664                         RemoveAttrDefaultById(object->objectId);
665                         break;
666
667                 case OCLASS_LANGUAGE:
668                         DropProceduralLanguageById(object->objectId);
669                         break;
670
671                 case OCLASS_OPERATOR:
672                         RemoveOperatorById(object->objectId);
673                         break;
674
675                 case OCLASS_OPCLASS:
676                         RemoveOpClassById(object->objectId);
677                         break;
678
679                 case OCLASS_REWRITE:
680                         RemoveRewriteRuleById(object->objectId);
681                         break;
682
683                 case OCLASS_TRIGGER:
684                         RemoveTriggerById(object->objectId);
685                         break;
686
687                 case OCLASS_SCHEMA:
688                         RemoveSchemaById(object->objectId);
689                         break;
690
691                 default:
692                         elog(ERROR, "doDeletion: Unsupported object class %u",
693                                  object->classId);
694         }
695 }
696
697 /*
698  * recordDependencyOnExpr - find expression dependencies
699  *
700  * This is used to find the dependencies of rules, constraint expressions,
701  * etc.
702  *
703  * Given an expression or query in node-tree form, find all the objects
704  * it refers to (tables, columns, operators, functions, etc).  Record
705  * a dependency of the specified type from the given depender object
706  * to each object mentioned in the expression.
707  *
708  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
709  * It can be NIL if no such variables are expected.
710  *
711  * XXX is it important to create dependencies on the datatypes mentioned in
712  * the expression?      In most cases this would be redundant (eg, a ref to an
713  * operator indirectly references its input and output datatypes), but I'm
714  * not quite convinced there are no cases where we need it.
715  */
716 void
717 recordDependencyOnExpr(const ObjectAddress *depender,
718                                            Node *expr, List *rtable,
719                                            DependencyType behavior)
720 {
721         find_expr_references_context context;
722
723         init_object_addresses(&context.addrs);
724
725         /* Set up interpretation for Vars at varlevelsup = 0 */
726         context.rtables = makeList1(rtable);
727
728         /* Scan the expression tree for referenceable objects */
729         find_expr_references_walker(expr, &context);
730
731         /* Remove any duplicates */
732         eliminate_duplicate_dependencies(&context.addrs);
733
734         /* And record 'em */
735         recordMultipleDependencies(depender,
736                                                            context.addrs.refs, context.addrs.numrefs,
737                                                            behavior);
738
739         term_object_addresses(&context.addrs);
740 }
741
742 /*
743  * Recursively search an expression tree for object references.
744  *
745  * Note: we avoid creating references to columns of tables that participate
746  * in an SQL JOIN construct, but are not actually used anywhere in the query.
747  * To do so, we do not scan the joinaliasvars list of a join RTE while
748  * scanning the query rangetable, but instead scan each individual entry
749  * of the alias list when we find a reference to it.
750  */
751 static bool
752 find_expr_references_walker(Node *node,
753                                                         find_expr_references_context *context)
754 {
755         if (node == NULL)
756                 return false;
757         if (IsA(node, Var))
758         {
759                 Var                *var = (Var *) node;
760                 int                     levelsup;
761                 List       *rtable,
762                                    *rtables;
763                 RangeTblEntry *rte;
764
765                 /* Find matching rtable entry, or complain if not found */
766                 levelsup = var->varlevelsup;
767                 rtables = context->rtables;
768                 while (levelsup--)
769                 {
770                         if (rtables == NIL)
771                                 break;
772                         rtables = lnext(rtables);
773                 }
774                 if (rtables == NIL)
775                         elog(ERROR, "find_expr_references_walker: bogus varlevelsup %d",
776                                  var->varlevelsup);
777                 rtable = lfirst(rtables);
778                 if (var->varno <= 0 || var->varno > length(rtable))
779                         elog(ERROR, "find_expr_references_walker: bogus varno %d",
780                                  var->varno);
781                 rte = rt_fetch(var->varno, rtable);
782                 if (rte->rtekind == RTE_RELATION)
783                 {
784                         /* If it's a plain relation, reference this column */
785                         /* NB: this code works for whole-row Var with attno 0, too */
786                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
787                                                            &context->addrs);
788                 }
789                 else if (rte->rtekind == RTE_JOIN)
790                 {
791                         /* Scan join output column to add references to join inputs */
792                         if (var->varattno <= 0 ||
793                                 var->varattno > length(rte->joinaliasvars))
794                                 elog(ERROR, "find_expr_references_walker: bogus varattno %d",
795                                          var->varattno);
796                         find_expr_references_walker((Node *) nth(var->varattno - 1,
797                                                                                                          rte->joinaliasvars),
798                                                                                 context);
799                 }
800                 return false;
801         }
802         if (IsA(node, Expr))
803         {
804                 Expr       *expr = (Expr *) node;
805
806                 if (expr->opType == OP_EXPR)
807                 {
808                         Oper       *oper = (Oper *) expr->oper;
809
810                         add_object_address(OCLASS_OPERATOR, oper->opno, 0,
811                                                            &context->addrs);
812                 }
813                 else if (expr->opType == FUNC_EXPR)
814                 {
815                         Func       *func = (Func *) expr->oper;
816
817                         add_object_address(OCLASS_PROC, func->funcid, 0,
818                                                            &context->addrs);
819                 }
820                 /* fall through to examine arguments */
821         }
822         if (IsA(node, Aggref))
823         {
824                 Aggref     *aggref = (Aggref *) node;
825
826                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
827                                                    &context->addrs);
828                 /* fall through to examine arguments */
829         }
830         if (is_subplan(node))
831         {
832                 /* Extra work needed here if we ever need this case */
833                 elog(ERROR, "find_expr_references_walker: already-planned subqueries not supported");
834         }
835         if (IsA(node, Query))
836         {
837                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
838                 Query      *query = (Query *) node;
839                 List       *rtable;
840                 bool            result;
841
842                 /*
843                  * Add whole-relation refs for each plain relation mentioned in
844                  * the subquery's rtable.  (Note: query_tree_walker takes care of
845                  * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need
846                  * to do that here.  But keep it from looking at join alias lists.)
847                  */
848                 foreach(rtable, query->rtable)
849                 {
850                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
851
852                         if (rte->rtekind == RTE_RELATION)
853                                 add_object_address(OCLASS_CLASS, rte->relid, 0,
854                                                                    &context->addrs);
855                 }
856
857                 /* Examine substructure of query */
858                 context->rtables = lcons(query->rtable, context->rtables);
859                 result = query_tree_walker(query,
860                                                                    find_expr_references_walker,
861                                                                    (void *) context,
862                                                                    QTW_IGNORE_JOINALIASES);
863                 context->rtables = lnext(context->rtables);
864                 return result;
865         }
866         return expression_tree_walker(node, find_expr_references_walker,
867                                                                   (void *) context);
868 }
869
870 /*
871  * Given an array of dependency references, eliminate any duplicates.
872  */
873 static void
874 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
875 {
876         ObjectAddress *priorobj;
877         int                     oldref,
878                                 newrefs;
879
880         if (addrs->numrefs <= 1)
881                 return;                                 /* nothing to do */
882
883         /* Sort the refs so that duplicates are adjacent */
884         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
885                   object_address_comparator);
886
887         /* Remove dups */
888         priorobj = addrs->refs;
889         newrefs = 1;
890         for (oldref = 1; oldref < addrs->numrefs; oldref++)
891         {
892                 ObjectAddress *thisobj = addrs->refs + oldref;
893
894                 if (priorobj->classId == thisobj->classId &&
895                         priorobj->objectId == thisobj->objectId)
896                 {
897                         if (priorobj->objectSubId == thisobj->objectSubId)
898                                 continue;               /* identical, so drop thisobj */
899
900                         /*
901                          * If we have a whole-object reference and a reference to a
902                          * part of the same object, we don't need the whole-object
903                          * reference (for example, we don't need to reference both
904                          * table foo and column foo.bar).  The whole-object reference
905                          * will always appear first in the sorted list.
906                          */
907                         if (priorobj->objectSubId == 0)
908                         {
909                                 /* replace whole ref with partial */
910                                 priorobj->objectSubId = thisobj->objectSubId;
911                                 continue;
912                         }
913                 }
914                 /* Not identical, so add thisobj to output set */
915                 priorobj++;
916                 priorobj->classId = thisobj->classId;
917                 priorobj->objectId = thisobj->objectId;
918                 priorobj->objectSubId = thisobj->objectSubId;
919                 newrefs++;
920         }
921
922         addrs->numrefs = newrefs;
923 }
924
925 /*
926  * qsort comparator for ObjectAddress items
927  */
928 static int
929 object_address_comparator(const void *a, const void *b)
930 {
931         const ObjectAddress *obja = (const ObjectAddress *) a;
932         const ObjectAddress *objb = (const ObjectAddress *) b;
933
934         if (obja->classId < objb->classId)
935                 return -1;
936         if (obja->classId > objb->classId)
937                 return 1;
938         if (obja->objectId < objb->objectId)
939                 return -1;
940         if (obja->objectId > objb->objectId)
941                 return 1;
942
943         /*
944          * We sort the subId as an unsigned int so that 0 will come first. See
945          * logic in eliminate_duplicate_dependencies.
946          */
947         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
948                 return -1;
949         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
950                 return 1;
951         return 0;
952 }
953
954 /*
955  * Routines for handling an expansible array of ObjectAddress items.
956  *
957  * init_object_addresses: initialize an ObjectAddresses array.
958  */
959 static void
960 init_object_addresses(ObjectAddresses *addrs)
961 {
962         /* Initialize array to empty */
963         addrs->numrefs = 0;
964         addrs->maxrefs = 32;            /* arbitrary initial array size */
965         addrs->refs = (ObjectAddress *)
966                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
967
968         /* Initialize object_classes[] if not done yet */
969         /* This will be needed by add_object_address() */
970         if (!object_classes_initialized)
971                 init_object_classes();
972 }
973
974 /*
975  * Add an entry to an ObjectAddresses array.
976  *
977  * It is convenient to specify the class by ObjectClass rather than directly
978  * by catalog OID.
979  */
980 static void
981 add_object_address(ObjectClasses oclass, Oid objectId, int32 subId,
982                                    ObjectAddresses *addrs)
983 {
984         ObjectAddress *item;
985
986         /* enlarge array if needed */
987         if (addrs->numrefs >= addrs->maxrefs)
988         {
989                 addrs->maxrefs *= 2;
990                 addrs->refs = (ObjectAddress *)
991                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
992         }
993         /* record this item */
994         item = addrs->refs + addrs->numrefs;
995         item->classId = object_classes[oclass];
996         item->objectId = objectId;
997         item->objectSubId = subId;
998         addrs->numrefs++;
999 }
1000
1001 /*
1002  * Add an entry to an ObjectAddresses array.
1003  *
1004  * As above, but specify entry exactly.
1005  */
1006 static void
1007 add_exact_object_address(const ObjectAddress *object,
1008                                                  ObjectAddresses *addrs)
1009 {
1010         ObjectAddress *item;
1011
1012         /* enlarge array if needed */
1013         if (addrs->numrefs >= addrs->maxrefs)
1014         {
1015                 addrs->maxrefs *= 2;
1016                 addrs->refs = (ObjectAddress *)
1017                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1018         }
1019         /* record this item */
1020         item = addrs->refs + addrs->numrefs;
1021         *item = *object;
1022         addrs->numrefs++;
1023 }
1024
1025 /*
1026  * Test whether an object is present in an ObjectAddresses array.
1027  *
1028  * We return "true" if object is a subobject of something in the array, too.
1029  */
1030 static bool
1031 object_address_present(const ObjectAddress *object,
1032                                            ObjectAddresses *addrs)
1033 {
1034         int                     i;
1035
1036         for (i = addrs->numrefs - 1; i >= 0; i--)
1037         {
1038                 ObjectAddress *thisobj = addrs->refs + i;
1039
1040                 if (object->classId == thisobj->classId &&
1041                         object->objectId == thisobj->objectId)
1042                 {
1043                         if (object->objectSubId == thisobj->objectSubId ||
1044                                 thisobj->objectSubId == 0)
1045                                 return true;
1046                 }
1047         }
1048
1049         return false;
1050 }
1051
1052 /*
1053  * Clean up when done with an ObjectAddresses array.
1054  */
1055 static void
1056 term_object_addresses(ObjectAddresses *addrs)
1057 {
1058         pfree(addrs->refs);
1059 }
1060
1061 /*
1062  * Initialize the object_classes[] table.
1063  *
1064  * Although some of these OIDs aren't compile-time constants, they surely
1065  * shouldn't change during a backend's run.  So, we look them up the
1066  * first time through and then cache them.
1067  */
1068 static void
1069 init_object_classes(void)
1070 {
1071         object_classes[OCLASS_CLASS] = RelOid_pg_class;
1072         object_classes[OCLASS_PROC] = RelOid_pg_proc;
1073         object_classes[OCLASS_TYPE] = RelOid_pg_type;
1074         object_classes[OCLASS_CAST] = get_system_catalog_relid(CastRelationName);
1075         object_classes[OCLASS_CONSTRAINT] = get_system_catalog_relid(ConstraintRelationName);
1076         object_classes[OCLASS_CONVERSION] = get_system_catalog_relid(ConversionRelationName);
1077         object_classes[OCLASS_DEFAULT] = get_system_catalog_relid(AttrDefaultRelationName);
1078         object_classes[OCLASS_LANGUAGE] = get_system_catalog_relid(LanguageRelationName);
1079         object_classes[OCLASS_OPERATOR] = get_system_catalog_relid(OperatorRelationName);
1080         object_classes[OCLASS_OPCLASS] = get_system_catalog_relid(OperatorClassRelationName);
1081         object_classes[OCLASS_REWRITE] = get_system_catalog_relid(RewriteRelationName);
1082         object_classes[OCLASS_TRIGGER] = get_system_catalog_relid(TriggerRelationName);
1083         object_classes[OCLASS_SCHEMA] = get_system_catalog_relid(NamespaceRelationName);
1084         object_classes_initialized = true;
1085 }
1086
1087 /*
1088  * Determine the class of a given object identified by objectAddress.
1089  *
1090  * This function is needed just because some of the system catalogs do
1091  * not have hardwired-at-compile-time OIDs.
1092  */
1093 static ObjectClasses
1094 getObjectClass(const ObjectAddress *object)
1095 {
1096         /* Easy for the bootstrapped catalogs... */
1097         switch (object->classId)
1098         {
1099                 case RelOid_pg_class:
1100                         /* caller must check objectSubId */
1101                         return OCLASS_CLASS;
1102
1103                 case RelOid_pg_proc:
1104                         Assert(object->objectSubId == 0);
1105                         return OCLASS_PROC;
1106
1107                 case RelOid_pg_type:
1108                         Assert(object->objectSubId == 0);
1109                         return OCLASS_TYPE;
1110         }
1111
1112         /*
1113          * Handle cases where catalog's OID is not hardwired.
1114          */
1115         if (!object_classes_initialized)
1116                 init_object_classes();
1117
1118         if (object->classId == object_classes[OCLASS_CAST])
1119         {
1120                 Assert(object->objectSubId == 0);
1121                 return OCLASS_CAST;
1122         }
1123         if (object->classId == object_classes[OCLASS_CONSTRAINT])
1124         {
1125                 Assert(object->objectSubId == 0);
1126                 return OCLASS_CONSTRAINT;
1127         }
1128         if (object->classId == object_classes[OCLASS_CONVERSION])
1129         {
1130                 Assert(object->objectSubId == 0);
1131                 return OCLASS_CONVERSION;
1132         }
1133         if (object->classId == object_classes[OCLASS_DEFAULT])
1134         {
1135                 Assert(object->objectSubId == 0);
1136                 return OCLASS_DEFAULT;
1137         }
1138         if (object->classId == object_classes[OCLASS_LANGUAGE])
1139         {
1140                 Assert(object->objectSubId == 0);
1141                 return OCLASS_LANGUAGE;
1142         }
1143         if (object->classId == object_classes[OCLASS_OPERATOR])
1144         {
1145                 Assert(object->objectSubId == 0);
1146                 return OCLASS_OPERATOR;
1147         }
1148         if (object->classId == object_classes[OCLASS_OPCLASS])
1149         {
1150                 Assert(object->objectSubId == 0);
1151                 return OCLASS_OPCLASS;
1152         }
1153         if (object->classId == object_classes[OCLASS_REWRITE])
1154         {
1155                 Assert(object->objectSubId == 0);
1156                 return OCLASS_REWRITE;
1157         }
1158         if (object->classId == object_classes[OCLASS_TRIGGER])
1159         {
1160                 Assert(object->objectSubId == 0);
1161                 return OCLASS_TRIGGER;
1162         }
1163         if (object->classId == object_classes[OCLASS_SCHEMA])
1164         {
1165                 Assert(object->objectSubId == 0);
1166                 return OCLASS_SCHEMA;
1167         }
1168
1169         elog(ERROR, "getObjectClass: Unknown object class %u",
1170                  object->classId);
1171         return OCLASS_CLASS;            /* keep compiler quiet */
1172 }
1173
1174 /*
1175  * getObjectDescription: build an object description for messages
1176  *
1177  * The result is a palloc'd string.
1178  */
1179 static char *
1180 getObjectDescription(const ObjectAddress *object)
1181 {
1182         StringInfoData buffer;
1183
1184         initStringInfo(&buffer);
1185
1186         switch (getObjectClass(object))
1187         {
1188                 case OCLASS_CLASS:
1189                         getRelationDescription(&buffer, object->objectId);
1190                         if (object->objectSubId != 0)
1191                                 appendStringInfo(&buffer, " column %s",
1192                                                                  get_attname(object->objectId,
1193                                                                                          object->objectSubId));
1194                         break;
1195
1196                 case OCLASS_PROC:
1197                         appendStringInfo(&buffer, "function %s",
1198                                                          format_procedure(object->objectId));
1199                         break;
1200
1201                 case OCLASS_TYPE:
1202                         appendStringInfo(&buffer, "type %s",
1203                                                          format_type_be(object->objectId));
1204                         break;
1205
1206                 case OCLASS_CAST:
1207                         {
1208                                 Relation        castDesc;
1209                                 ScanKeyData skey[1];
1210                                 SysScanDesc rcscan;
1211                                 HeapTuple       tup;
1212                                 Form_pg_cast castForm;
1213
1214                                 castDesc = heap_openr(CastRelationName, AccessShareLock);
1215
1216                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1217                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1218                                                                          ObjectIdGetDatum(object->objectId));
1219
1220                                 rcscan = systable_beginscan(castDesc, CastOidIndex, true,
1221                                                                                         SnapshotNow, 1, skey);
1222
1223                                 tup = systable_getnext(rcscan);
1224
1225                                 if (!HeapTupleIsValid(tup))
1226                                         elog(ERROR, "getObjectDescription: Cast %u does not exist",
1227                                                  object->objectId);
1228
1229                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
1230
1231                                 appendStringInfo(&buffer, "cast from %s to %s",
1232                                                                  format_type_be(castForm->castsource),
1233                                                                  format_type_be(castForm->casttarget));
1234
1235                                 systable_endscan(rcscan);
1236                                 heap_close(castDesc, AccessShareLock);
1237                                 break;
1238                         }
1239
1240                 case OCLASS_CONSTRAINT:
1241                         {
1242                                 Relation        conDesc;
1243                                 ScanKeyData skey[1];
1244                                 SysScanDesc rcscan;
1245                                 HeapTuple       tup;
1246                                 Form_pg_constraint con;
1247
1248                                 conDesc = heap_openr(ConstraintRelationName, AccessShareLock);
1249
1250                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1251                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1252                                                                          ObjectIdGetDatum(object->objectId));
1253
1254                                 rcscan = systable_beginscan(conDesc, ConstraintOidIndex, true,
1255                                                                                         SnapshotNow, 1, skey);
1256
1257                                 tup = systable_getnext(rcscan);
1258
1259                                 if (!HeapTupleIsValid(tup))
1260                                         elog(ERROR, "getObjectDescription: Constraint %u does not exist",
1261                                                  object->objectId);
1262
1263                                 con = (Form_pg_constraint) GETSTRUCT(tup);
1264
1265                                 if (OidIsValid(con->conrelid))
1266                                 {
1267                                         appendStringInfo(&buffer, "constraint %s on ",
1268                                                                          NameStr(con->conname));
1269                                         getRelationDescription(&buffer, con->conrelid);
1270                                 }
1271                                 else
1272                                 {
1273                                         appendStringInfo(&buffer, "constraint %s",
1274                                                                          NameStr(con->conname));
1275                                 }
1276
1277                                 systable_endscan(rcscan);
1278                                 heap_close(conDesc, AccessShareLock);
1279                                 break;
1280                         }
1281
1282                 case OCLASS_CONVERSION:
1283                         {
1284                                 HeapTuple       conTup;
1285
1286                                 conTup = SearchSysCache(CONOID,
1287                                                                           ObjectIdGetDatum(object->objectId),
1288                                                                                 0, 0, 0);
1289                                 if (!HeapTupleIsValid(conTup))
1290                                         elog(ERROR, "getObjectDescription: Conversion %u does not exist",
1291                                                  object->objectId);
1292                                 appendStringInfo(&buffer, "conversion %s",
1293                                                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
1294                                 ReleaseSysCache(conTup);
1295                                 break;
1296                         }
1297
1298                 case OCLASS_DEFAULT:
1299                         {
1300                                 Relation        attrdefDesc;
1301                                 ScanKeyData skey[1];
1302                                 SysScanDesc adscan;
1303                                 HeapTuple       tup;
1304                                 Form_pg_attrdef attrdef;
1305                                 ObjectAddress colobject;
1306
1307                                 attrdefDesc = heap_openr(AttrDefaultRelationName, AccessShareLock);
1308
1309                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1310                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1311                                                                          ObjectIdGetDatum(object->objectId));
1312
1313                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndex, true,
1314                                                                                         SnapshotNow, 1, skey);
1315
1316                                 tup = systable_getnext(adscan);
1317
1318                                 if (!HeapTupleIsValid(tup))
1319                                         elog(ERROR, "getObjectDescription: Default %u does not exist",
1320                                                  object->objectId);
1321
1322                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
1323
1324                                 colobject.classId = RelOid_pg_class;
1325                                 colobject.objectId = attrdef->adrelid;
1326                                 colobject.objectSubId = attrdef->adnum;
1327
1328                                 appendStringInfo(&buffer, "default for %s",
1329                                                                  getObjectDescription(&colobject));
1330
1331                                 systable_endscan(adscan);
1332                                 heap_close(attrdefDesc, AccessShareLock);
1333                                 break;
1334                         }
1335
1336                 case OCLASS_LANGUAGE:
1337                         {
1338                                 HeapTuple       langTup;
1339
1340                                 langTup = SearchSysCache(LANGOID,
1341                                                                           ObjectIdGetDatum(object->objectId),
1342                                                                                  0, 0, 0);
1343                                 if (!HeapTupleIsValid(langTup))
1344                                         elog(ERROR, "getObjectDescription: Language %u does not exist",
1345                                                  object->objectId);
1346                                 appendStringInfo(&buffer, "language %s",
1347                                                                  NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
1348                                 ReleaseSysCache(langTup);
1349                                 break;
1350                         }
1351
1352                 case OCLASS_OPERATOR:
1353                         appendStringInfo(&buffer, "operator %s",
1354                                                          format_operator(object->objectId));
1355                         break;
1356
1357                 case OCLASS_OPCLASS:
1358                         {
1359                                 HeapTuple       opcTup;
1360                                 Form_pg_opclass opcForm;
1361                                 HeapTuple       amTup;
1362                                 Form_pg_am      amForm;
1363                                 char       *nspname;
1364
1365                                 opcTup = SearchSysCache(CLAOID,
1366                                                                           ObjectIdGetDatum(object->objectId),
1367                                                                                 0, 0, 0);
1368                                 if (!HeapTupleIsValid(opcTup))
1369                                         elog(ERROR, "cache lookup of opclass %u failed",
1370                                                  object->objectId);
1371                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
1372
1373                                 /* Qualify the name if not visible in search path */
1374                                 if (OpclassIsVisible(object->objectId))
1375                                         nspname = NULL;
1376                                 else
1377                                         nspname = get_namespace_name(opcForm->opcnamespace);
1378
1379                                 appendStringInfo(&buffer, "operator class %s",
1380                                                                  quote_qualified_identifier(nspname,
1381                                                                                          NameStr(opcForm->opcname)));
1382
1383                                 amTup = SearchSysCache(AMOID,
1384                                                                            ObjectIdGetDatum(opcForm->opcamid),
1385                                                                            0, 0, 0);
1386                                 if (!HeapTupleIsValid(amTup))
1387                                         elog(ERROR, "syscache lookup for AM %u failed",
1388                                                  opcForm->opcamid);
1389                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
1390
1391                                 appendStringInfo(&buffer, " for %s",
1392                                                                  NameStr(amForm->amname));
1393
1394                                 ReleaseSysCache(amTup);
1395                                 ReleaseSysCache(opcTup);
1396                                 break;
1397                         }
1398
1399                 case OCLASS_REWRITE:
1400                         {
1401                                 Relation        ruleDesc;
1402                                 ScanKeyData skey[1];
1403                                 SysScanDesc rcscan;
1404                                 HeapTuple       tup;
1405                                 Form_pg_rewrite rule;
1406
1407                                 ruleDesc = heap_openr(RewriteRelationName, AccessShareLock);
1408
1409                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1410                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1411                                                                          ObjectIdGetDatum(object->objectId));
1412
1413                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndex, true,
1414                                                                                         SnapshotNow, 1, skey);
1415
1416                                 tup = systable_getnext(rcscan);
1417
1418                                 if (!HeapTupleIsValid(tup))
1419                                         elog(ERROR, "getObjectDescription: Rule %u does not exist",
1420                                                  object->objectId);
1421
1422                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
1423
1424                                 appendStringInfo(&buffer, "rule %s on ",
1425                                                                  NameStr(rule->rulename));
1426                                 getRelationDescription(&buffer, rule->ev_class);
1427
1428                                 systable_endscan(rcscan);
1429                                 heap_close(ruleDesc, AccessShareLock);
1430                                 break;
1431                         }
1432
1433                 case OCLASS_TRIGGER:
1434                         {
1435                                 Relation        trigDesc;
1436                                 ScanKeyData skey[1];
1437                                 SysScanDesc tgscan;
1438                                 HeapTuple       tup;
1439                                 Form_pg_trigger trig;
1440
1441                                 trigDesc = heap_openr(TriggerRelationName, AccessShareLock);
1442
1443                                 ScanKeyEntryInitialize(&skey[0], 0x0,
1444                                                                            ObjectIdAttributeNumber, F_OIDEQ,
1445                                                                          ObjectIdGetDatum(object->objectId));
1446
1447                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndex, true,
1448                                                                                         SnapshotNow, 1, skey);
1449
1450                                 tup = systable_getnext(tgscan);
1451
1452                                 if (!HeapTupleIsValid(tup))
1453                                         elog(ERROR, "getObjectDescription: Trigger %u does not exist",
1454                                                  object->objectId);
1455
1456                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
1457
1458                                 appendStringInfo(&buffer, "trigger %s on ",
1459                                                                  NameStr(trig->tgname));
1460                                 getRelationDescription(&buffer, trig->tgrelid);
1461
1462                                 systable_endscan(tgscan);
1463                                 heap_close(trigDesc, AccessShareLock);
1464                                 break;
1465                         }
1466
1467                 case OCLASS_SCHEMA:
1468                         {
1469                                 char       *nspname;
1470
1471                                 nspname = get_namespace_name(object->objectId);
1472                                 if (!nspname)
1473                                         elog(ERROR, "getObjectDescription: Schema %u does not exist",
1474                                                  object->objectId);
1475                                 appendStringInfo(&buffer, "schema %s", nspname);
1476                                 break;
1477                         }
1478
1479                 default:
1480                         appendStringInfo(&buffer, "unknown object %u %u %d",
1481                                                          object->classId,
1482                                                          object->objectId,
1483                                                          object->objectSubId);
1484                         break;
1485         }
1486
1487         return buffer.data;
1488 }
1489
1490 /*
1491  * subroutine for getObjectDescription: describe a relation
1492  */
1493 static void
1494 getRelationDescription(StringInfo buffer, Oid relid)
1495 {
1496         HeapTuple       relTup;
1497         Form_pg_class relForm;
1498         char       *nspname;
1499         char       *relname;
1500
1501         relTup = SearchSysCache(RELOID,
1502                                                         ObjectIdGetDatum(relid),
1503                                                         0, 0, 0);
1504         if (!HeapTupleIsValid(relTup))
1505                 elog(ERROR, "cache lookup of relation %u failed", relid);
1506         relForm = (Form_pg_class) GETSTRUCT(relTup);
1507
1508         /* Qualify the name if not visible in search path */
1509         if (RelationIsVisible(relid))
1510                 nspname = NULL;
1511         else
1512                 nspname = get_namespace_name(relForm->relnamespace);
1513
1514         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
1515
1516         switch (relForm->relkind)
1517         {
1518                 case RELKIND_RELATION:
1519                         appendStringInfo(buffer, "table %s",
1520                                                          relname);
1521                         break;
1522                 case RELKIND_INDEX:
1523                         appendStringInfo(buffer, "index %s",
1524                                                          relname);
1525                         break;
1526                 case RELKIND_SPECIAL:
1527                         appendStringInfo(buffer, "special system relation %s",
1528                                                          relname);
1529                         break;
1530                 case RELKIND_SEQUENCE:
1531                         appendStringInfo(buffer, "sequence %s",
1532                                                          relname);
1533                         break;
1534                 case RELKIND_UNCATALOGED:
1535                         appendStringInfo(buffer, "uncataloged table %s",
1536                                                          relname);
1537                         break;
1538                 case RELKIND_TOASTVALUE:
1539                         appendStringInfo(buffer, "toast table %s",
1540                                                          relname);
1541                         break;
1542                 case RELKIND_VIEW:
1543                         appendStringInfo(buffer, "view %s",
1544                                                          relname);
1545                         break;
1546                 case RELKIND_COMPOSITE_TYPE:
1547                         appendStringInfo(buffer, "composite type %s",
1548                                                          relname);
1549                         break;
1550                 default:
1551                         /* shouldn't get here */
1552                         appendStringInfo(buffer, "relation %s",
1553                                                          relname);
1554                         break;
1555         }
1556
1557         ReleaseSysCache(relTup);
1558 }