]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
Improve the recently-added support for properly pluralized error messages
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.88 2009/06/04 18:33:06 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/sysattr.h"
20 #include "access/xact.h"
21 #include "catalog/dependency.h"
22 #include "catalog/heap.h"
23 #include "catalog/index.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_amop.h"
27 #include "catalog/pg_amproc.h"
28 #include "catalog/pg_attrdef.h"
29 #include "catalog/pg_authid.h"
30 #include "catalog/pg_cast.h"
31 #include "catalog/pg_constraint.h"
32 #include "catalog/pg_conversion.h"
33 #include "catalog/pg_conversion_fn.h"
34 #include "catalog/pg_database.h"
35 #include "catalog/pg_depend.h"
36 #include "catalog/pg_foreign_data_wrapper.h"
37 #include "catalog/pg_foreign_server.h"
38 #include "catalog/pg_language.h"
39 #include "catalog/pg_namespace.h"
40 #include "catalog/pg_opclass.h"
41 #include "catalog/pg_operator.h"
42 #include "catalog/pg_opfamily.h"
43 #include "catalog/pg_proc.h"
44 #include "catalog/pg_rewrite.h"
45 #include "catalog/pg_tablespace.h"
46 #include "catalog/pg_trigger.h"
47 #include "catalog/pg_ts_config.h"
48 #include "catalog/pg_ts_dict.h"
49 #include "catalog/pg_ts_parser.h"
50 #include "catalog/pg_ts_template.h"
51 #include "catalog/pg_type.h"
52 #include "catalog/pg_user_mapping.h"
53 #include "commands/comment.h"
54 #include "commands/dbcommands.h"
55 #include "commands/defrem.h"
56 #include "commands/proclang.h"
57 #include "commands/schemacmds.h"
58 #include "commands/tablespace.h"
59 #include "commands/trigger.h"
60 #include "commands/typecmds.h"
61 #include "foreign/foreign.h"
62 #include "miscadmin.h"
63 #include "nodes/nodeFuncs.h"
64 #include "parser/parsetree.h"
65 #include "rewrite/rewriteRemove.h"
66 #include "storage/lmgr.h"
67 #include "utils/builtins.h"
68 #include "utils/fmgroids.h"
69 #include "utils/guc.h"
70 #include "utils/lsyscache.h"
71 #include "utils/syscache.h"
72 #include "utils/tqual.h"
73
74
75 /*
76  * Deletion processing requires additional state for each ObjectAddress that
77  * it's planning to delete.  For simplicity and code-sharing we make the
78  * ObjectAddresses code support arrays with or without this extra state.
79  */
80 typedef struct
81 {
82         int                     flags;                  /* bitmask, see bit definitions below */
83         ObjectAddress dependee;         /* object whose deletion forced this one */
84 } ObjectAddressExtra;
85
86 /* ObjectAddressExtra flag bits */
87 #define DEPFLAG_ORIGINAL        0x0001          /* an original deletion target */
88 #define DEPFLAG_NORMAL          0x0002          /* reached via normal dependency */
89 #define DEPFLAG_AUTO            0x0004          /* reached via auto dependency */
90 #define DEPFLAG_INTERNAL        0x0008          /* reached via internal dependency */
91
92
93 /* expansible list of ObjectAddresses */
94 struct ObjectAddresses
95 {
96         ObjectAddress *refs;            /* => palloc'd array */
97         ObjectAddressExtra *extras;     /* => palloc'd array, or NULL if not used */
98         int                     numrefs;                /* current number of references */
99         int                     maxrefs;                /* current size of palloc'd array(s) */
100 };
101
102 /* typedef ObjectAddresses appears in dependency.h */
103
104 /* threaded list of ObjectAddresses, for recursion detection */
105 typedef struct ObjectAddressStack
106 {
107         const ObjectAddress *object; /* object being visited */
108         int                     flags;                  /* its current flag bits */
109         struct ObjectAddressStack *next; /* next outer stack level */
110 } ObjectAddressStack;
111
112 /* for find_expr_references_walker */
113 typedef struct
114 {
115         ObjectAddresses *addrs;         /* addresses being accumulated */
116         List       *rtables;            /* list of rangetables to resolve Vars */
117 } find_expr_references_context;
118
119 /*
120  * This constant table maps ObjectClasses to the corresponding catalog OIDs.
121  * See also getObjectClass().
122  */
123 static const Oid object_classes[MAX_OCLASS] = {
124         RelationRelationId,                     /* OCLASS_CLASS */
125         ProcedureRelationId,            /* OCLASS_PROC */
126         TypeRelationId,                         /* OCLASS_TYPE */
127         CastRelationId,                         /* OCLASS_CAST */
128         ConstraintRelationId,           /* OCLASS_CONSTRAINT */
129         ConversionRelationId,           /* OCLASS_CONVERSION */
130         AttrDefaultRelationId,          /* OCLASS_DEFAULT */
131         LanguageRelationId,                     /* OCLASS_LANGUAGE */
132         OperatorRelationId,                     /* OCLASS_OPERATOR */
133         OperatorClassRelationId,        /* OCLASS_OPCLASS */
134         OperatorFamilyRelationId,       /* OCLASS_OPFAMILY */
135         AccessMethodOperatorRelationId,         /* OCLASS_AMOP */
136         AccessMethodProcedureRelationId,        /* OCLASS_AMPROC */
137         RewriteRelationId,                      /* OCLASS_REWRITE */
138         TriggerRelationId,                      /* OCLASS_TRIGGER */
139         NamespaceRelationId,            /* OCLASS_SCHEMA */
140         TSParserRelationId,                     /* OCLASS_TSPARSER */
141         TSDictionaryRelationId,         /* OCLASS_TSDICT */
142         TSTemplateRelationId,           /* OCLASS_TSTEMPLATE */
143         TSConfigRelationId,                     /* OCLASS_TSCONFIG */
144         AuthIdRelationId,                       /* OCLASS_ROLE */
145         DatabaseRelationId,                     /* OCLASS_DATABASE */
146         TableSpaceRelationId            /* OCLASS_TBLSPACE */
147 };
148
149
150 static void findDependentObjects(const ObjectAddress *object,
151                                          int flags,
152                                          ObjectAddressStack *stack,
153                                          ObjectAddresses *targetObjects,
154                                          const ObjectAddresses *pendingObjects,
155                                          Relation depRel);
156 static void reportDependentObjects(const ObjectAddresses *targetObjects,
157                                            DropBehavior behavior,
158                                            int msglevel,
159                                            const ObjectAddress *origObject);
160 static void deleteOneObject(const ObjectAddress *object, Relation depRel);
161 static void doDeletion(const ObjectAddress *object);
162 static void AcquireDeletionLock(const ObjectAddress *object);
163 static void ReleaseDeletionLock(const ObjectAddress *object);
164 static bool find_expr_references_walker(Node *node,
165                                                         find_expr_references_context *context);
166 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
167 static int      object_address_comparator(const void *a, const void *b);
168 static void add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
169                                    ObjectAddresses *addrs);
170 static void add_exact_object_address_extra(const ObjectAddress *object,
171                                                            const ObjectAddressExtra *extra,
172                                                            ObjectAddresses *addrs);
173 static bool object_address_present_add_flags(const ObjectAddress *object,
174                                                                                          int flags,
175                                                                                          ObjectAddresses *addrs);
176 static void getRelationDescription(StringInfo buffer, Oid relid);
177 static void getOpFamilyDescription(StringInfo buffer, Oid opfid);
178
179
180 /*
181  * performDeletion: attempt to drop the specified object.  If CASCADE
182  * behavior is specified, also drop any dependent objects (recursively).
183  * If RESTRICT behavior is specified, error out if there are any dependent
184  * objects, except for those that should be implicitly dropped anyway
185  * according to the dependency type.
186  *
187  * This is the outer control routine for all forms of DROP that drop objects
188  * that can participate in dependencies.  Note that the next two routines
189  * are variants on the same theme; if you change anything here you'll likely
190  * need to fix them too.
191  */
192 void
193 performDeletion(const ObjectAddress *object,
194                                 DropBehavior behavior)
195 {
196         Relation        depRel;
197         ObjectAddresses *targetObjects;
198         int                     i;
199
200         /*
201          * We save some cycles by opening pg_depend just once and passing the
202          * Relation pointer down to all the recursive deletion steps.
203          */
204         depRel = heap_open(DependRelationId, RowExclusiveLock);
205
206         /*
207          * Acquire deletion lock on the target object.  (Ideally the caller has
208          * done this already, but many places are sloppy about it.)
209          */
210         AcquireDeletionLock(object);
211
212         /*
213          * Construct a list of objects to delete (ie, the given object plus
214          * everything directly or indirectly dependent on it).
215          */
216         targetObjects = new_object_addresses();
217
218         findDependentObjects(object,
219                                                  DEPFLAG_ORIGINAL,
220                                                  NULL,  /* empty stack */
221                                                  targetObjects,
222                                                  NULL,  /* no pendingObjects */
223                                                  depRel);
224
225         /*
226          * Check if deletion is allowed, and report about cascaded deletes.
227          */
228         reportDependentObjects(targetObjects,
229                                                    behavior,
230                                                    NOTICE,
231                                                    object);
232
233         /*
234          * Delete all the objects in the proper order.
235          */
236         for (i = 0; i < targetObjects->numrefs; i++)
237         {
238                 ObjectAddress *thisobj = targetObjects->refs + i;
239
240                 deleteOneObject(thisobj, depRel);
241         }
242
243         /* And clean up */
244         free_object_addresses(targetObjects);
245
246         heap_close(depRel, RowExclusiveLock);
247 }
248
249 /*
250  * performMultipleDeletions: Similar to performDeletion, but act on multiple
251  * objects at once.
252  *
253  * The main difference from issuing multiple performDeletion calls is that the
254  * list of objects that would be implicitly dropped, for each object to be
255  * dropped, is the union of the implicit-object list for all objects.  This
256  * makes each check be more relaxed.
257  */
258 void
259 performMultipleDeletions(const ObjectAddresses *objects,
260                                                  DropBehavior behavior)
261 {
262         Relation        depRel;
263         ObjectAddresses *targetObjects;
264         int                     i;
265
266         /* No work if no objects... */
267         if (objects->numrefs <= 0)
268                 return;
269
270         /*
271          * We save some cycles by opening pg_depend just once and passing the
272          * Relation pointer down to all the recursive deletion steps.
273          */
274         depRel = heap_open(DependRelationId, RowExclusiveLock);
275
276         /*
277          * Construct a list of objects to delete (ie, the given objects plus
278          * everything directly or indirectly dependent on them).  Note that
279          * because we pass the whole objects list as pendingObjects context,
280          * we won't get a failure from trying to delete an object that is
281          * internally dependent on another one in the list; we'll just skip
282          * that object and delete it when we reach its owner.
283          */
284         targetObjects = new_object_addresses();
285
286         for (i = 0; i < objects->numrefs; i++)
287         {
288                 const ObjectAddress *thisobj = objects->refs + i;
289
290                 /*
291                  * Acquire deletion lock on each target object.  (Ideally the caller
292                  * has done this already, but many places are sloppy about it.)
293                  */
294                 AcquireDeletionLock(thisobj);
295
296                 findDependentObjects(thisobj,
297                                                          DEPFLAG_ORIGINAL,
298                                                          NULL,  /* empty stack */
299                                                          targetObjects,
300                                                          objects,
301                                                          depRel);
302         }
303
304         /*
305          * Check if deletion is allowed, and report about cascaded deletes.
306          *
307          * If there's exactly one object being deleted, report it the same
308          * way as in performDeletion(), else we have to be vaguer.
309          */
310         reportDependentObjects(targetObjects,
311                                                    behavior,
312                                                    NOTICE,
313                                                    (objects->numrefs == 1 ? objects->refs : NULL));
314
315         /*
316          * Delete all the objects in the proper order.
317          */
318         for (i = 0; i < targetObjects->numrefs; i++)
319         {
320                 ObjectAddress *thisobj = targetObjects->refs + i;
321
322                 deleteOneObject(thisobj, depRel);
323         }
324
325         /* And clean up */
326         free_object_addresses(targetObjects);
327
328         heap_close(depRel, RowExclusiveLock);
329 }
330
331 /*
332  * deleteWhatDependsOn: attempt to drop everything that depends on the
333  * specified object, though not the object itself.      Behavior is always
334  * CASCADE.
335  *
336  * This is currently used only to clean out the contents of a schema
337  * (namespace): the passed object is a namespace.  We normally want this
338  * to be done silently, so there's an option to suppress NOTICE messages.
339  */
340 void
341 deleteWhatDependsOn(const ObjectAddress *object,
342                                         bool showNotices)
343 {
344         Relation        depRel;
345         ObjectAddresses *targetObjects;
346         int                     i;
347
348         /*
349          * We save some cycles by opening pg_depend just once and passing the
350          * Relation pointer down to all the recursive deletion steps.
351          */
352         depRel = heap_open(DependRelationId, RowExclusiveLock);
353
354         /*
355          * Acquire deletion lock on the target object.  (Ideally the caller has
356          * done this already, but many places are sloppy about it.)
357          */
358         AcquireDeletionLock(object);
359
360         /*
361          * Construct a list of objects to delete (ie, the given object plus
362          * everything directly or indirectly dependent on it).
363          */
364         targetObjects = new_object_addresses();
365
366         findDependentObjects(object,
367                                                  DEPFLAG_ORIGINAL,
368                                                  NULL,  /* empty stack */
369                                                  targetObjects,
370                                                  NULL,  /* no pendingObjects */
371                                                  depRel);
372
373         /*
374          * Check if deletion is allowed, and report about cascaded deletes.
375          */
376         reportDependentObjects(targetObjects,
377                                                    DROP_CASCADE,
378                                                    showNotices ? NOTICE : DEBUG2,
379                                                    object);
380
381         /*
382          * Delete all the objects in the proper order, except we skip the original
383          * object.
384          */
385         for (i = 0; i < targetObjects->numrefs; i++)
386         {
387                 ObjectAddress *thisobj = targetObjects->refs + i;
388                 ObjectAddressExtra *thisextra = targetObjects->extras + i;
389
390                 if (thisextra->flags & DEPFLAG_ORIGINAL)
391                         continue;
392
393                 deleteOneObject(thisobj, depRel);
394         }
395
396         /* And clean up */
397         free_object_addresses(targetObjects);
398
399         heap_close(depRel, RowExclusiveLock);
400 }
401
402 /*
403  * findDependentObjects - find all objects that depend on 'object'
404  *
405  * For every object that depends on the starting object, acquire a deletion
406  * lock on the object, add it to targetObjects (if not already there),
407  * and recursively find objects that depend on it.  An object's dependencies
408  * will be placed into targetObjects before the object itself; this means
409  * that the finished list's order represents a safe deletion order.
410  *
411  * The caller must already have a deletion lock on 'object' itself,
412  * but must not have added it to targetObjects.  (Note: there are corner
413  * cases where we won't add the object either, and will also release the
414  * caller-taken lock.  This is a bit ugly, but the API is set up this way
415  * to allow easy rechecking of an object's liveness after we lock it.  See
416  * notes within the function.)
417  *
418  * When dropping a whole object (subId = 0), we find dependencies for
419  * its sub-objects too.
420  *
421  *      object: the object to add to targetObjects and find dependencies on
422  *      flags: flags to be ORed into the object's targetObjects entry
423  *      stack: list of objects being visited in current recursion; topmost item
424  *                      is the object that we recursed from (NULL for external callers)
425  *      targetObjects: list of objects that are scheduled to be deleted
426  *      pendingObjects: list of other objects slated for destruction, but
427  *                      not necessarily in targetObjects yet (can be NULL if none)
428  *      depRel: already opened pg_depend relation
429  */
430 static void
431 findDependentObjects(const ObjectAddress *object,
432                                          int flags,
433                                          ObjectAddressStack *stack,
434                                          ObjectAddresses *targetObjects,
435                                          const ObjectAddresses *pendingObjects,
436                                          Relation depRel)
437 {
438         ScanKeyData key[3];
439         int                     nkeys;
440         SysScanDesc scan;
441         HeapTuple       tup;
442         ObjectAddress otherObject;
443         ObjectAddressStack mystack;
444         ObjectAddressExtra extra;
445         ObjectAddressStack *stackptr;
446
447         /*
448          * If the target object is already being visited in an outer recursion
449          * level, just report the current flags back to that level and exit.
450          * This is needed to avoid infinite recursion in the face of circular
451          * dependencies.
452          *
453          * The stack check alone would result in dependency loops being broken at
454          * an arbitrary point, ie, the first member object of the loop to be
455          * visited is the last one to be deleted.  This is obviously unworkable.
456          * However, the check for internal dependency below guarantees that we
457          * will not break a loop at an internal dependency: if we enter the loop
458          * at an "owned" object we will switch and start at the "owning" object
459          * instead.  We could probably hack something up to avoid breaking at an
460          * auto dependency, too, if we had to.  However there are no known cases
461          * where that would be necessary.
462          */
463         for (stackptr = stack; stackptr; stackptr = stackptr->next)
464         {
465                 if (object->classId == stackptr->object->classId &&
466                         object->objectId == stackptr->object->objectId)
467                 {
468                         if (object->objectSubId == stackptr->object->objectSubId)
469                         {
470                                 stackptr->flags |= flags;
471                                 return;
472                         }
473                         /*
474                          * Could visit column with whole table already on stack; this is
475                          * the same case noted in object_address_present_add_flags().
476                          * (It's not clear this can really happen, but we might as well
477                          * check.)
478                          */
479                         if (stackptr->object->objectSubId == 0)
480                                 return;
481                 }
482         }
483
484         /*
485          * It's also possible that the target object has already been completely
486          * processed and put into targetObjects.  If so, again we just add the
487          * specified flags to its entry and return.
488          *
489          * (Note: in these early-exit cases we could release the caller-taken
490          * lock, since the object is presumably now locked multiple times;
491          * but it seems not worth the cycles.)
492          */
493         if (object_address_present_add_flags(object, flags, targetObjects))
494                 return;
495
496         /*
497          * The target object might be internally dependent on some other object
498          * (its "owner").  If so, and if we aren't recursing from the owning
499          * object, we have to transform this deletion request into a deletion
500          * request of the owning object.  (We'll eventually recurse back to this
501          * object, but the owning object has to be visited first so it will be
502          * deleted after.)  The way to find out about this is to scan the
503          * pg_depend entries that show what this object depends on.
504          */
505         ScanKeyInit(&key[0],
506                                 Anum_pg_depend_classid,
507                                 BTEqualStrategyNumber, F_OIDEQ,
508                                 ObjectIdGetDatum(object->classId));
509         ScanKeyInit(&key[1],
510                                 Anum_pg_depend_objid,
511                                 BTEqualStrategyNumber, F_OIDEQ,
512                                 ObjectIdGetDatum(object->objectId));
513         if (object->objectSubId != 0)
514         {
515                 ScanKeyInit(&key[2],
516                                         Anum_pg_depend_objsubid,
517                                         BTEqualStrategyNumber, F_INT4EQ,
518                                         Int32GetDatum(object->objectSubId));
519                 nkeys = 3;
520         }
521         else
522                 nkeys = 2;
523
524         scan = systable_beginscan(depRel, DependDependerIndexId, true,
525                                                           SnapshotNow, nkeys, key);
526
527         while (HeapTupleIsValid(tup = systable_getnext(scan)))
528         {
529                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
530
531                 otherObject.classId = foundDep->refclassid;
532                 otherObject.objectId = foundDep->refobjid;
533                 otherObject.objectSubId = foundDep->refobjsubid;
534
535                 switch (foundDep->deptype)
536                 {
537                         case DEPENDENCY_NORMAL:
538                         case DEPENDENCY_AUTO:
539                                 /* no problem */
540                                 break;
541                         case DEPENDENCY_INTERNAL:
542
543                                 /*
544                                  * This object is part of the internal implementation of
545                                  * another object.      We have three cases:
546                                  *
547                                  * 1. At the outermost recursion level, disallow the DROP. (We
548                                  * just ereport here, rather than proceeding, since no other
549                                  * dependencies are likely to be interesting.)  However, if
550                                  * the other object is listed in pendingObjects, just release
551                                  * the caller's lock and return; we'll eventually complete
552                                  * the DROP when we reach that entry in the pending list.
553                                  */
554                                 if (stack == NULL)
555                                 {
556                                         char       *otherObjDesc;
557
558                                         if (object_address_present(&otherObject, pendingObjects))
559                                         {
560                                                 systable_endscan(scan);
561                                                 /* need to release caller's lock; see notes below */
562                                                 ReleaseDeletionLock(object);
563                                                 return;
564                                         }
565                                         otherObjDesc = getObjectDescription(&otherObject);
566                                         ereport(ERROR,
567                                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
568                                                          errmsg("cannot drop %s because %s requires it",
569                                                                         getObjectDescription(object),
570                                                                         otherObjDesc),
571                                                          errhint("You can drop %s instead.",
572                                                                          otherObjDesc)));
573                                 }
574
575                                 /*
576                                  * 2. When recursing from the other end of this dependency,
577                                  * it's okay to continue with the deletion. This holds when
578                                  * recursing from a whole object that includes the nominal
579                                  * other end as a component, too.
580                                  */
581                                 if (stack->object->classId == otherObject.classId &&
582                                         stack->object->objectId == otherObject.objectId &&
583                                         (stack->object->objectSubId == otherObject.objectSubId ||
584                                          stack->object->objectSubId == 0))
585                                         break;
586
587                                 /*
588                                  * 3. When recursing from anyplace else, transform this
589                                  * deletion request into a delete of the other object.
590                                  *
591                                  * First, release caller's lock on this object and get
592                                  * deletion lock on the other object.  (We must release
593                                  * caller's lock to avoid deadlock against a concurrent
594                                  * deletion of the other object.)
595                                  */
596                                 ReleaseDeletionLock(object);
597                                 AcquireDeletionLock(&otherObject);
598
599                                 /*
600                                  * The other object might have been deleted while we waited
601                                  * to lock it; if so, neither it nor the current object are
602                                  * interesting anymore.  We test this by checking the
603                                  * pg_depend entry (see notes below).
604                                  */
605                                 if (!systable_recheck_tuple(scan, tup))
606                                 {
607                                         systable_endscan(scan);
608                                         ReleaseDeletionLock(&otherObject);
609                                         return;
610                                 }
611
612                                 /*
613                                  * Okay, recurse to the other object instead of proceeding.
614                                  * We treat this exactly as if the original reference had
615                                  * linked to that object instead of this one; hence, pass
616                                  * through the same flags and stack.
617                                  */
618                                 findDependentObjects(&otherObject,
619                                                                          flags,
620                                                                          stack,
621                                                                          targetObjects,
622                                                                          pendingObjects,
623                                                                          depRel);
624                                 /* And we're done here. */
625                                 systable_endscan(scan);
626                                 return;
627                         case DEPENDENCY_PIN:
628
629                                 /*
630                                  * Should not happen; PIN dependencies should have zeroes in
631                                  * the depender fields...
632                                  */
633                                 elog(ERROR, "incorrect use of PIN dependency with %s",
634                                          getObjectDescription(object));
635                                 break;
636                         default:
637                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
638                                          foundDep->deptype, getObjectDescription(object));
639                                 break;
640                 }
641         }
642
643         systable_endscan(scan);
644
645         /*
646          * Now recurse to any dependent objects.  We must visit them first
647          * since they have to be deleted before the current object.
648          */
649         mystack.object = object;        /* set up a new stack level */
650         mystack.flags = flags;
651         mystack.next = stack;
652
653         ScanKeyInit(&key[0],
654                                 Anum_pg_depend_refclassid,
655                                 BTEqualStrategyNumber, F_OIDEQ,
656                                 ObjectIdGetDatum(object->classId));
657         ScanKeyInit(&key[1],
658                                 Anum_pg_depend_refobjid,
659                                 BTEqualStrategyNumber, F_OIDEQ,
660                                 ObjectIdGetDatum(object->objectId));
661         if (object->objectSubId != 0)
662         {
663                 ScanKeyInit(&key[2],
664                                         Anum_pg_depend_refobjsubid,
665                                         BTEqualStrategyNumber, F_INT4EQ,
666                                         Int32GetDatum(object->objectSubId));
667                 nkeys = 3;
668         }
669         else
670                 nkeys = 2;
671
672         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
673                                                           SnapshotNow, nkeys, key);
674
675         while (HeapTupleIsValid(tup = systable_getnext(scan)))
676         {
677                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
678                 int             subflags;
679
680                 otherObject.classId = foundDep->classid;
681                 otherObject.objectId = foundDep->objid;
682                 otherObject.objectSubId = foundDep->objsubid;
683
684                 /*
685                  * Must lock the dependent object before recursing to it.
686                  */
687                 AcquireDeletionLock(&otherObject);
688
689                 /*
690                  * The dependent object might have been deleted while we waited
691                  * to lock it; if so, we don't need to do anything more with it.
692                  * We can test this cheaply and independently of the object's type
693                  * by seeing if the pg_depend tuple we are looking at is still live.
694                  * (If the object got deleted, the tuple would have been deleted too.)
695                  */
696                 if (!systable_recheck_tuple(scan, tup))
697                 {
698                         /* release the now-useless lock */
699                         ReleaseDeletionLock(&otherObject);
700                         /* and continue scanning for dependencies */
701                         continue;
702                 }
703
704                 /* Recurse, passing flags indicating the dependency type */
705                 switch (foundDep->deptype)
706                 {
707                         case DEPENDENCY_NORMAL:
708                                 subflags = DEPFLAG_NORMAL;
709                                 break;
710                         case DEPENDENCY_AUTO:
711                                 subflags = DEPFLAG_AUTO;
712                                 break;
713                         case DEPENDENCY_INTERNAL:
714                                 subflags = DEPFLAG_INTERNAL;
715                                 break;
716                         case DEPENDENCY_PIN:
717
718                                 /*
719                                  * For a PIN dependency we just ereport immediately; there
720                                  * won't be any others to report.
721                                  */
722                                 ereport(ERROR,
723                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
724                                                  errmsg("cannot drop %s because it is required by the database system",
725                                                                 getObjectDescription(object))));
726                                 subflags = 0;   /* keep compiler quiet */
727                                 break;
728                         default:
729                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
730                                          foundDep->deptype, getObjectDescription(object));
731                                 subflags = 0;   /* keep compiler quiet */
732                                 break;
733                 }
734
735                 findDependentObjects(&otherObject,
736                                                          subflags,
737                                                          &mystack,
738                                                          targetObjects,
739                                                          pendingObjects,
740                                                          depRel);
741         }
742
743         systable_endscan(scan);
744
745         /*
746          * Finally, we can add the target object to targetObjects.  Be careful
747          * to include any flags that were passed back down to us from inner
748          * recursion levels.
749          */
750         extra.flags = mystack.flags;
751         if (stack)
752                 extra.dependee = *stack->object;
753         else
754                 memset(&extra.dependee, 0, sizeof(extra.dependee));
755         add_exact_object_address_extra(object, &extra, targetObjects);
756 }
757
758 /*
759  * reportDependentObjects - report about dependencies, and fail if RESTRICT
760  *
761  * Tell the user about dependent objects that we are going to delete
762  * (or would need to delete, but are prevented by RESTRICT mode);
763  * then error out if there are any and it's not CASCADE mode.
764  *
765  *      targetObjects: list of objects that are scheduled to be deleted
766  *      behavior: RESTRICT or CASCADE
767  *      msglevel: elog level for non-error report messages
768  *      origObject: base object of deletion, or NULL if not available
769  *              (the latter case occurs in DROP OWNED)
770  */
771 static void
772 reportDependentObjects(const ObjectAddresses *targetObjects,
773                                            DropBehavior behavior,
774                                            int msglevel,
775                                            const ObjectAddress *origObject)
776 {
777         bool            ok = true;
778         StringInfoData clientdetail;
779         StringInfoData logdetail;
780         int                     numReportedClient = 0;
781         int                     numNotReportedClient = 0;
782         int                     i;
783
784         /*
785          * If no error is to be thrown, and the msglevel is too low to be shown
786          * to either client or server log, there's no need to do any of the work.
787          *
788          * Note: this code doesn't know all there is to be known about elog
789          * levels, but it works for NOTICE and DEBUG2, which are the only values
790          * msglevel can currently have.  We also assume we are running in a normal
791          * operating environment.
792          */
793         if (behavior == DROP_CASCADE &&
794                 msglevel < client_min_messages &&
795                 (msglevel < log_min_messages || log_min_messages == LOG))
796                 return;
797
798         /*
799          * We limit the number of dependencies reported to the client to
800          * MAX_REPORTED_DEPS, since client software may not deal well with
801          * enormous error strings.      The server log always gets a full report.
802          */
803 #define MAX_REPORTED_DEPS 100
804
805         initStringInfo(&clientdetail);
806         initStringInfo(&logdetail);
807
808         /*
809          * We process the list back to front (ie, in dependency order not deletion
810          * order), since this makes for a more understandable display.
811          */
812         for (i = targetObjects->numrefs - 1; i >= 0; i--)
813         {
814                 const ObjectAddress *obj = &targetObjects->refs[i];
815                 const ObjectAddressExtra *extra = &targetObjects->extras[i];
816                 char       *objDesc;
817
818                 /* Ignore the original deletion target(s) */
819                 if (extra->flags & DEPFLAG_ORIGINAL)
820                         continue;
821
822                 objDesc = getObjectDescription(obj);
823
824                 /*
825                  * If, at any stage of the recursive search, we reached the object
826                  * via an AUTO or INTERNAL dependency, then it's okay to delete it
827                  * even in RESTRICT mode.
828                  */
829                 if (extra->flags & (DEPFLAG_AUTO | DEPFLAG_INTERNAL))
830                 {
831                         /*
832                          * auto-cascades are reported at DEBUG2, not msglevel.  We
833                          * don't try to combine them with the regular message because
834                          * the results are too confusing when client_min_messages and
835                          * log_min_messages are different.
836                          */
837                         ereport(DEBUG2,
838                                         (errmsg("drop auto-cascades to %s",
839                                                         objDesc)));
840                 }
841                 else if (behavior == DROP_RESTRICT)
842                 {
843                         char   *otherDesc = getObjectDescription(&extra->dependee);
844
845                         if (numReportedClient < MAX_REPORTED_DEPS)
846                         {
847                                 /* separate entries with a newline */
848                                 if (clientdetail.len != 0)
849                                         appendStringInfoChar(&clientdetail, '\n');
850                                 appendStringInfo(&clientdetail, _("%s depends on %s"),
851                                                                  objDesc, otherDesc);
852                                 numReportedClient++;
853                         }
854                         else
855                                 numNotReportedClient++;
856                         /* separate entries with a newline */
857                         if (logdetail.len != 0)
858                                 appendStringInfoChar(&logdetail, '\n');
859                         appendStringInfo(&logdetail, _("%s depends on %s"),
860                                                          objDesc, otherDesc);
861                         pfree(otherDesc);
862                         ok = false;
863                 }
864                 else
865                 {
866                         if (numReportedClient < MAX_REPORTED_DEPS)
867                         {
868                                 /* separate entries with a newline */
869                                 if (clientdetail.len != 0)
870                                         appendStringInfoChar(&clientdetail, '\n');
871                                 appendStringInfo(&clientdetail, _("drop cascades to %s"),
872                                                                  objDesc);
873                                 numReportedClient++;
874                         }
875                         else
876                                 numNotReportedClient++;
877                         /* separate entries with a newline */
878                         if (logdetail.len != 0)
879                                 appendStringInfoChar(&logdetail, '\n');
880                         appendStringInfo(&logdetail, _("drop cascades to %s"),
881                                                          objDesc);
882                 }
883
884                 pfree(objDesc);
885         }
886
887         if (numNotReportedClient > 0)
888                 appendStringInfo(&clientdetail, ngettext("\nand %d other object "
889                                                                                                  "(see server log for list)",
890                                                                                                  "\nand %d other objects "
891                                                                                                  "(see server log for list)",
892                                                                                                  numNotReportedClient),
893                                                  numNotReportedClient);
894
895         if (!ok)
896         {
897                 if (origObject)
898                         ereport(ERROR,
899                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
900                                          errmsg("cannot drop %s because other objects depend on it",
901                                                         getObjectDescription(origObject)),
902                                          errdetail("%s", clientdetail.data),
903                                          errdetail_log("%s", logdetail.data),
904                                          errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
905                 else
906                         ereport(ERROR,
907                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
908                                          errmsg("cannot drop desired object(s) because other objects depend on them"),
909                                          errdetail("%s", clientdetail.data),
910                                          errdetail_log("%s", logdetail.data),
911                                          errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
912         }
913         else if (numReportedClient > 1)
914         {
915                 ereport(msglevel,
916                                 /* translator: %d always has a value larger than 1 */
917                                 (errmsg_plural("drop cascades to %d other object",
918                                                            "drop cascades to %d other objects",
919                                                            numReportedClient + numNotReportedClient,
920                                                            numReportedClient + numNotReportedClient),
921                                  errdetail("%s", clientdetail.data),
922                                  errdetail_log("%s", logdetail.data)));
923         }
924         else if (numReportedClient == 1)
925         {
926                 /* we just use the single item as-is */
927                 ereport(msglevel,
928                                 (errmsg_internal("%s", clientdetail.data)));
929         }
930
931         pfree(clientdetail.data);
932         pfree(logdetail.data);
933 }
934
935 /*
936  * deleteOneObject: delete a single object for performDeletion.
937  *
938  * depRel is the already-open pg_depend relation.
939  */
940 static void
941 deleteOneObject(const ObjectAddress *object, Relation depRel)
942 {
943         ScanKeyData key[3];
944         int                     nkeys;
945         SysScanDesc scan;
946         HeapTuple       tup;
947
948         /*
949          * First remove any pg_depend records that link from this object to
950          * others.  (Any records linking to this object should be gone already.)
951          *
952          * When dropping a whole object (subId = 0), remove all pg_depend records
953          * for its sub-objects too.
954          */
955         ScanKeyInit(&key[0],
956                                 Anum_pg_depend_classid,
957                                 BTEqualStrategyNumber, F_OIDEQ,
958                                 ObjectIdGetDatum(object->classId));
959         ScanKeyInit(&key[1],
960                                 Anum_pg_depend_objid,
961                                 BTEqualStrategyNumber, F_OIDEQ,
962                                 ObjectIdGetDatum(object->objectId));
963         if (object->objectSubId != 0)
964         {
965                 ScanKeyInit(&key[2],
966                                         Anum_pg_depend_objsubid,
967                                         BTEqualStrategyNumber, F_INT4EQ,
968                                         Int32GetDatum(object->objectSubId));
969                 nkeys = 3;
970         }
971         else
972                 nkeys = 2;
973
974         scan = systable_beginscan(depRel, DependDependerIndexId, true,
975                                                           SnapshotNow, nkeys, key);
976
977         while (HeapTupleIsValid(tup = systable_getnext(scan)))
978         {
979                 simple_heap_delete(depRel, &tup->t_self);
980         }
981
982         systable_endscan(scan);
983
984         /*
985          * Delete shared dependency references related to this object.  Again,
986          * if subId = 0, remove records for sub-objects too.
987          */
988         deleteSharedDependencyRecordsFor(object->classId, object->objectId,
989                                                                          object->objectSubId);
990
991         /*
992          * Now delete the object itself, in an object-type-dependent way.
993          */
994         doDeletion(object);
995
996         /*
997          * Delete any comments associated with this object.  (This is a convenient
998          * place to do it instead of having every object type know to do it.)
999          */
1000         DeleteComments(object->objectId, object->classId, object->objectSubId);
1001
1002         /*
1003          * CommandCounterIncrement here to ensure that preceding changes are all
1004          * visible to the next deletion step.
1005          */
1006         CommandCounterIncrement();
1007
1008         /*
1009          * And we're done!
1010          */
1011 }
1012
1013 /*
1014  * doDeletion: actually delete a single object
1015  */
1016 static void
1017 doDeletion(const ObjectAddress *object)
1018 {
1019         switch (getObjectClass(object))
1020         {
1021                 case OCLASS_CLASS:
1022                         {
1023                                 char            relKind = get_rel_relkind(object->objectId);
1024
1025                                 if (relKind == RELKIND_INDEX)
1026                                 {
1027                                         Assert(object->objectSubId == 0);
1028                                         index_drop(object->objectId);
1029                                 }
1030                                 else
1031                                 {
1032                                         if (object->objectSubId != 0)
1033                                                 RemoveAttributeById(object->objectId,
1034                                                                                         object->objectSubId);
1035                                         else
1036                                                 heap_drop_with_catalog(object->objectId);
1037                                 }
1038                                 break;
1039                         }
1040
1041                 case OCLASS_PROC:
1042                         RemoveFunctionById(object->objectId);
1043                         break;
1044
1045                 case OCLASS_TYPE:
1046                         RemoveTypeById(object->objectId);
1047                         break;
1048
1049                 case OCLASS_CAST:
1050                         DropCastById(object->objectId);
1051                         break;
1052
1053                 case OCLASS_CONSTRAINT:
1054                         RemoveConstraintById(object->objectId);
1055                         break;
1056
1057                 case OCLASS_CONVERSION:
1058                         RemoveConversionById(object->objectId);
1059                         break;
1060
1061                 case OCLASS_DEFAULT:
1062                         RemoveAttrDefaultById(object->objectId);
1063                         break;
1064
1065                 case OCLASS_LANGUAGE:
1066                         DropProceduralLanguageById(object->objectId);
1067                         break;
1068
1069                 case OCLASS_OPERATOR:
1070                         RemoveOperatorById(object->objectId);
1071                         break;
1072
1073                 case OCLASS_OPCLASS:
1074                         RemoveOpClassById(object->objectId);
1075                         break;
1076
1077                 case OCLASS_OPFAMILY:
1078                         RemoveOpFamilyById(object->objectId);
1079                         break;
1080
1081                 case OCLASS_AMOP:
1082                         RemoveAmOpEntryById(object->objectId);
1083                         break;
1084
1085                 case OCLASS_AMPROC:
1086                         RemoveAmProcEntryById(object->objectId);
1087                         break;
1088
1089                 case OCLASS_REWRITE:
1090                         RemoveRewriteRuleById(object->objectId);
1091                         break;
1092
1093                 case OCLASS_TRIGGER:
1094                         RemoveTriggerById(object->objectId);
1095                         break;
1096
1097                 case OCLASS_SCHEMA:
1098                         RemoveSchemaById(object->objectId);
1099                         break;
1100
1101                 case OCLASS_TSPARSER:
1102                         RemoveTSParserById(object->objectId);
1103                         break;
1104
1105                 case OCLASS_TSDICT:
1106                         RemoveTSDictionaryById(object->objectId);
1107                         break;
1108
1109                 case OCLASS_TSTEMPLATE:
1110                         RemoveTSTemplateById(object->objectId);
1111                         break;
1112
1113                 case OCLASS_TSCONFIG:
1114                         RemoveTSConfigurationById(object->objectId);
1115                         break;
1116
1117                 case OCLASS_USER_MAPPING:
1118                         RemoveUserMappingById(object->objectId);
1119                         break;
1120
1121                 case OCLASS_FOREIGN_SERVER:
1122                         RemoveForeignServerById(object->objectId);
1123                         break;
1124
1125                 case OCLASS_FDW:
1126                         RemoveForeignDataWrapperById(object->objectId);
1127                         break;
1128
1129                         /* OCLASS_ROLE, OCLASS_DATABASE, OCLASS_TBLSPACE not handled */
1130
1131                 default:
1132                         elog(ERROR, "unrecognized object class: %u",
1133                                  object->classId);
1134         }
1135 }
1136
1137 /*
1138  * AcquireDeletionLock - acquire a suitable lock for deleting an object
1139  *
1140  * We use LockRelation for relations, LockDatabaseObject for everything
1141  * else.  Note that dependency.c is not concerned with deleting any kind of
1142  * shared-across-databases object, so we have no need for LockSharedObject.
1143  */
1144 static void
1145 AcquireDeletionLock(const ObjectAddress *object)
1146 {
1147         if (object->classId == RelationRelationId)
1148                 LockRelationOid(object->objectId, AccessExclusiveLock);
1149         else
1150                 /* assume we should lock the whole object not a sub-object */
1151                 LockDatabaseObject(object->classId, object->objectId, 0,
1152                                                    AccessExclusiveLock);
1153 }
1154
1155 /*
1156  * ReleaseDeletionLock - release an object deletion lock
1157  */
1158 static void
1159 ReleaseDeletionLock(const ObjectAddress *object)
1160 {
1161         if (object->classId == RelationRelationId)
1162                 UnlockRelationOid(object->objectId, AccessExclusiveLock);
1163         else
1164                 /* assume we should lock the whole object not a sub-object */
1165                 UnlockDatabaseObject(object->classId, object->objectId, 0,
1166                                                          AccessExclusiveLock);
1167 }
1168
1169 /*
1170  * recordDependencyOnExpr - find expression dependencies
1171  *
1172  * This is used to find the dependencies of rules, constraint expressions,
1173  * etc.
1174  *
1175  * Given an expression or query in node-tree form, find all the objects
1176  * it refers to (tables, columns, operators, functions, etc).  Record
1177  * a dependency of the specified type from the given depender object
1178  * to each object mentioned in the expression.
1179  *
1180  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
1181  * It can be NIL if no such variables are expected.
1182  */
1183 void
1184 recordDependencyOnExpr(const ObjectAddress *depender,
1185                                            Node *expr, List *rtable,
1186                                            DependencyType behavior)
1187 {
1188         find_expr_references_context context;
1189
1190         context.addrs = new_object_addresses();
1191
1192         /* Set up interpretation for Vars at varlevelsup = 0 */
1193         context.rtables = list_make1(rtable);
1194
1195         /* Scan the expression tree for referenceable objects */
1196         find_expr_references_walker(expr, &context);
1197
1198         /* Remove any duplicates */
1199         eliminate_duplicate_dependencies(context.addrs);
1200
1201         /* And record 'em */
1202         recordMultipleDependencies(depender,
1203                                                            context.addrs->refs, context.addrs->numrefs,
1204                                                            behavior);
1205
1206         free_object_addresses(context.addrs);
1207 }
1208
1209 /*
1210  * recordDependencyOnSingleRelExpr - find expression dependencies
1211  *
1212  * As above, but only one relation is expected to be referenced (with
1213  * varno = 1 and varlevelsup = 0).      Pass the relation OID instead of a
1214  * range table.  An additional frammish is that dependencies on that
1215  * relation (or its component columns) will be marked with 'self_behavior',
1216  * whereas 'behavior' is used for everything else.
1217  */
1218 void
1219 recordDependencyOnSingleRelExpr(const ObjectAddress *depender,
1220                                                                 Node *expr, Oid relId,
1221                                                                 DependencyType behavior,
1222                                                                 DependencyType self_behavior)
1223 {
1224         find_expr_references_context context;
1225         RangeTblEntry rte;
1226
1227         context.addrs = new_object_addresses();
1228
1229         /* We gin up a rather bogus rangetable list to handle Vars */
1230         MemSet(&rte, 0, sizeof(rte));
1231         rte.type = T_RangeTblEntry;
1232         rte.rtekind = RTE_RELATION;
1233         rte.relid = relId;
1234
1235         context.rtables = list_make1(list_make1(&rte));
1236
1237         /* Scan the expression tree for referenceable objects */
1238         find_expr_references_walker(expr, &context);
1239
1240         /* Remove any duplicates */
1241         eliminate_duplicate_dependencies(context.addrs);
1242
1243         /* Separate self-dependencies if necessary */
1244         if (behavior != self_behavior && context.addrs->numrefs > 0)
1245         {
1246                 ObjectAddresses *self_addrs;
1247                 ObjectAddress *outobj;
1248                 int                     oldref,
1249                                         outrefs;
1250
1251                 self_addrs = new_object_addresses();
1252
1253                 outobj = context.addrs->refs;
1254                 outrefs = 0;
1255                 for (oldref = 0; oldref < context.addrs->numrefs; oldref++)
1256                 {
1257                         ObjectAddress *thisobj = context.addrs->refs + oldref;
1258
1259                         if (thisobj->classId == RelationRelationId &&
1260                                 thisobj->objectId == relId)
1261                         {
1262                                 /* Move this ref into self_addrs */
1263                                 add_exact_object_address(thisobj, self_addrs);
1264                         }
1265                         else
1266                         {
1267                                 /* Keep it in context.addrs */
1268                                 *outobj = *thisobj;
1269                                 outobj++;
1270                                 outrefs++;
1271                         }
1272                 }
1273                 context.addrs->numrefs = outrefs;
1274
1275                 /* Record the self-dependencies */
1276                 recordMultipleDependencies(depender,
1277                                                                    self_addrs->refs, self_addrs->numrefs,
1278                                                                    self_behavior);
1279
1280                 free_object_addresses(self_addrs);
1281         }
1282
1283         /* Record the external dependencies */
1284         recordMultipleDependencies(depender,
1285                                                            context.addrs->refs, context.addrs->numrefs,
1286                                                            behavior);
1287
1288         free_object_addresses(context.addrs);
1289 }
1290
1291 /*
1292  * Recursively search an expression tree for object references.
1293  *
1294  * Note: we avoid creating references to columns of tables that participate
1295  * in an SQL JOIN construct, but are not actually used anywhere in the query.
1296  * To do so, we do not scan the joinaliasvars list of a join RTE while
1297  * scanning the query rangetable, but instead scan each individual entry
1298  * of the alias list when we find a reference to it.
1299  *
1300  * Note: in many cases we do not need to create dependencies on the datatypes
1301  * involved in an expression, because we'll have an indirect dependency via
1302  * some other object.  For instance Var nodes depend on a column which depends
1303  * on the datatype, and OpExpr nodes depend on the operator which depends on
1304  * the datatype.  However we do need a type dependency if there is no such
1305  * indirect dependency, as for example in Const and CoerceToDomain nodes.
1306  */
1307 static bool
1308 find_expr_references_walker(Node *node,
1309                                                         find_expr_references_context *context)
1310 {
1311         if (node == NULL)
1312                 return false;
1313         if (IsA(node, Var))
1314         {
1315                 Var                *var = (Var *) node;
1316                 List       *rtable;
1317                 RangeTblEntry *rte;
1318
1319                 /* Find matching rtable entry, or complain if not found */
1320                 if (var->varlevelsup >= list_length(context->rtables))
1321                         elog(ERROR, "invalid varlevelsup %d", var->varlevelsup);
1322                 rtable = (List *) list_nth(context->rtables, var->varlevelsup);
1323                 if (var->varno <= 0 || var->varno > list_length(rtable))
1324                         elog(ERROR, "invalid varno %d", var->varno);
1325                 rte = rt_fetch(var->varno, rtable);
1326
1327                 /*
1328                  * A whole-row Var references no specific columns, so adds no new
1329                  * dependency.
1330                  */
1331                 if (var->varattno == InvalidAttrNumber)
1332                         return false;
1333                 if (rte->rtekind == RTE_RELATION)
1334                 {
1335                         /* If it's a plain relation, reference this column */
1336                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
1337                                                            context->addrs);
1338                 }
1339                 else if (rte->rtekind == RTE_JOIN)
1340                 {
1341                         /* Scan join output column to add references to join inputs */
1342                         List       *save_rtables;
1343
1344                         /* We must make the context appropriate for join's level */
1345                         save_rtables = context->rtables;
1346                         context->rtables = list_copy_tail(context->rtables,
1347                                                                                           var->varlevelsup);
1348                         if (var->varattno <= 0 ||
1349                                 var->varattno > list_length(rte->joinaliasvars))
1350                                 elog(ERROR, "invalid varattno %d", var->varattno);
1351                         find_expr_references_walker((Node *) list_nth(rte->joinaliasvars,
1352                                                                                                                   var->varattno - 1),
1353                                                                                 context);
1354                         list_free(context->rtables);
1355                         context->rtables = save_rtables;
1356                 }
1357                 return false;
1358         }
1359         else if (IsA(node, Const))
1360         {
1361                 Const      *con = (Const *) node;
1362                 Oid                     objoid;
1363
1364                 /* A constant must depend on the constant's datatype */
1365                 add_object_address(OCLASS_TYPE, con->consttype, 0,
1366                                                    context->addrs);
1367
1368                 /*
1369                  * If it's a regclass or similar literal referring to an existing
1370                  * object, add a reference to that object.      (Currently, only the
1371                  * regclass and regconfig cases have any likely use, but we may as
1372                  * well handle all the OID-alias datatypes consistently.)
1373                  */
1374                 if (!con->constisnull)
1375                 {
1376                         switch (con->consttype)
1377                         {
1378                                 case REGPROCOID:
1379                                 case REGPROCEDUREOID:
1380                                         objoid = DatumGetObjectId(con->constvalue);
1381                                         if (SearchSysCacheExists(PROCOID,
1382                                                                                          ObjectIdGetDatum(objoid),
1383                                                                                          0, 0, 0))
1384                                                 add_object_address(OCLASS_PROC, objoid, 0,
1385                                                                                    context->addrs);
1386                                         break;
1387                                 case REGOPEROID:
1388                                 case REGOPERATOROID:
1389                                         objoid = DatumGetObjectId(con->constvalue);
1390                                         if (SearchSysCacheExists(OPEROID,
1391                                                                                          ObjectIdGetDatum(objoid),
1392                                                                                          0, 0, 0))
1393                                                 add_object_address(OCLASS_OPERATOR, objoid, 0,
1394                                                                                    context->addrs);
1395                                         break;
1396                                 case REGCLASSOID:
1397                                         objoid = DatumGetObjectId(con->constvalue);
1398                                         if (SearchSysCacheExists(RELOID,
1399                                                                                          ObjectIdGetDatum(objoid),
1400                                                                                          0, 0, 0))
1401                                                 add_object_address(OCLASS_CLASS, objoid, 0,
1402                                                                                    context->addrs);
1403                                         break;
1404                                 case REGTYPEOID:
1405                                         objoid = DatumGetObjectId(con->constvalue);
1406                                         if (SearchSysCacheExists(TYPEOID,
1407                                                                                          ObjectIdGetDatum(objoid),
1408                                                                                          0, 0, 0))
1409                                                 add_object_address(OCLASS_TYPE, objoid, 0,
1410                                                                                    context->addrs);
1411                                         break;
1412                                 case REGCONFIGOID:
1413                                         objoid = DatumGetObjectId(con->constvalue);
1414                                         if (SearchSysCacheExists(TSCONFIGOID,
1415                                                                                          ObjectIdGetDatum(objoid),
1416                                                                                          0, 0, 0))
1417                                                 add_object_address(OCLASS_TSCONFIG, objoid, 0,
1418                                                                                    context->addrs);
1419                                         break;
1420                                 case REGDICTIONARYOID:
1421                                         objoid = DatumGetObjectId(con->constvalue);
1422                                         if (SearchSysCacheExists(TSDICTOID,
1423                                                                                          ObjectIdGetDatum(objoid),
1424                                                                                          0, 0, 0))
1425                                                 add_object_address(OCLASS_TSDICT, objoid, 0,
1426                                                                                    context->addrs);
1427                                         break;
1428                         }
1429                 }
1430                 return false;
1431         }
1432         else if (IsA(node, Param))
1433         {
1434                 Param      *param = (Param *) node;
1435
1436                 /* A parameter must depend on the parameter's datatype */
1437                 add_object_address(OCLASS_TYPE, param->paramtype, 0,
1438                                                    context->addrs);
1439         }
1440         else if (IsA(node, FuncExpr))
1441         {
1442                 FuncExpr   *funcexpr = (FuncExpr *) node;
1443
1444                 add_object_address(OCLASS_PROC, funcexpr->funcid, 0,
1445                                                    context->addrs);
1446                 /* fall through to examine arguments */
1447         }
1448         else if (IsA(node, OpExpr))
1449         {
1450                 OpExpr     *opexpr = (OpExpr *) node;
1451
1452                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1453                                                    context->addrs);
1454                 /* fall through to examine arguments */
1455         }
1456         else if (IsA(node, DistinctExpr))
1457         {
1458                 DistinctExpr *distinctexpr = (DistinctExpr *) node;
1459
1460                 add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0,
1461                                                    context->addrs);
1462                 /* fall through to examine arguments */
1463         }
1464         else if (IsA(node, ScalarArrayOpExpr))
1465         {
1466                 ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;
1467
1468                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1469                                                    context->addrs);
1470                 /* fall through to examine arguments */
1471         }
1472         else if (IsA(node, NullIfExpr))
1473         {
1474                 NullIfExpr *nullifexpr = (NullIfExpr *) node;
1475
1476                 add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0,
1477                                                    context->addrs);
1478                 /* fall through to examine arguments */
1479         }
1480         else if (IsA(node, Aggref))
1481         {
1482                 Aggref     *aggref = (Aggref *) node;
1483
1484                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
1485                                                    context->addrs);
1486                 /* fall through to examine arguments */
1487         }
1488         else if (IsA(node, WindowFunc))
1489         {
1490                 WindowFunc *wfunc = (WindowFunc *) node;
1491
1492                 add_object_address(OCLASS_PROC, wfunc->winfnoid, 0,
1493                                                    context->addrs);
1494                 /* fall through to examine arguments */
1495         }
1496         else if (IsA(node, SubPlan))
1497         {
1498                 /* Extra work needed here if we ever need this case */
1499                 elog(ERROR, "already-planned subqueries not supported");
1500         }
1501         else if (IsA(node, RelabelType))
1502         {
1503                 RelabelType *relab = (RelabelType *) node;
1504
1505                 /* since there is no function dependency, need to depend on type */
1506                 add_object_address(OCLASS_TYPE, relab->resulttype, 0,
1507                                                    context->addrs);
1508         }
1509         else if (IsA(node, CoerceViaIO))
1510         {
1511                 CoerceViaIO *iocoerce = (CoerceViaIO *) node;
1512
1513                 /* since there is no exposed function, need to depend on type */
1514                 add_object_address(OCLASS_TYPE, iocoerce->resulttype, 0,
1515                                                    context->addrs);
1516         }
1517         else if (IsA(node, ArrayCoerceExpr))
1518         {
1519                 ArrayCoerceExpr *acoerce = (ArrayCoerceExpr *) node;
1520
1521                 if (OidIsValid(acoerce->elemfuncid))
1522                         add_object_address(OCLASS_PROC, acoerce->elemfuncid, 0,
1523                                                            context->addrs);
1524                 add_object_address(OCLASS_TYPE, acoerce->resulttype, 0,
1525                                                    context->addrs);
1526                 /* fall through to examine arguments */
1527         }
1528         else if (IsA(node, ConvertRowtypeExpr))
1529         {
1530                 ConvertRowtypeExpr *cvt = (ConvertRowtypeExpr *) node;
1531
1532                 /* since there is no function dependency, need to depend on type */
1533                 add_object_address(OCLASS_TYPE, cvt->resulttype, 0,
1534                                                    context->addrs);
1535         }
1536         else if (IsA(node, RowExpr))
1537         {
1538                 RowExpr    *rowexpr = (RowExpr *) node;
1539
1540                 add_object_address(OCLASS_TYPE, rowexpr->row_typeid, 0,
1541                                                    context->addrs);
1542         }
1543         else if (IsA(node, RowCompareExpr))
1544         {
1545                 RowCompareExpr *rcexpr = (RowCompareExpr *) node;
1546                 ListCell   *l;
1547
1548                 foreach(l, rcexpr->opnos)
1549                 {
1550                         add_object_address(OCLASS_OPERATOR, lfirst_oid(l), 0,
1551                                                            context->addrs);
1552                 }
1553                 foreach(l, rcexpr->opfamilies)
1554                 {
1555                         add_object_address(OCLASS_OPFAMILY, lfirst_oid(l), 0,
1556                                                            context->addrs);
1557                 }
1558                 /* fall through to examine arguments */
1559         }
1560         else if (IsA(node, CoerceToDomain))
1561         {
1562                 CoerceToDomain *cd = (CoerceToDomain *) node;
1563
1564                 add_object_address(OCLASS_TYPE, cd->resulttype, 0,
1565                                                    context->addrs);
1566         }
1567         else if (IsA(node, SortGroupClause))
1568         {
1569                 SortGroupClause *sgc = (SortGroupClause *) node;
1570
1571                 add_object_address(OCLASS_OPERATOR, sgc->eqop, 0,
1572                                                    context->addrs);
1573                 if (OidIsValid(sgc->sortop))
1574                         add_object_address(OCLASS_OPERATOR, sgc->sortop, 0,
1575                                                            context->addrs);
1576                 return false;
1577         }
1578         else if (IsA(node, Query))
1579         {
1580                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
1581                 Query      *query = (Query *) node;
1582                 ListCell   *rtable;
1583                 bool            result;
1584
1585                 /*
1586                  * Add whole-relation refs for each plain relation mentioned in the
1587                  * subquery's rtable, as well as datatype refs for any datatypes used
1588                  * as a RECORD function's output.  (Note: query_tree_walker takes care
1589                  * of recursing into RTE_FUNCTION RTEs, subqueries, etc, so no need to
1590                  * do that here.  But keep it from looking at join alias lists.)
1591                  */
1592                 foreach(rtable, query->rtable)
1593                 {
1594                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
1595                         ListCell   *ct;
1596
1597                         switch (rte->rtekind)
1598                         {
1599                                 case RTE_RELATION:
1600                                         add_object_address(OCLASS_CLASS, rte->relid, 0,
1601                                                                            context->addrs);
1602                                         break;
1603                                 case RTE_FUNCTION:
1604                                         foreach(ct, rte->funccoltypes)
1605                                         {
1606                                                 add_object_address(OCLASS_TYPE, lfirst_oid(ct), 0,
1607                                                                                    context->addrs);
1608                                         }
1609                                         break;
1610                                 default:
1611                                         break;
1612                         }
1613                 }
1614
1615                 /* query_tree_walker ignores ORDER BY etc, but we need those opers */
1616                 find_expr_references_walker((Node *) query->sortClause, context);
1617                 find_expr_references_walker((Node *) query->groupClause, context);
1618                 find_expr_references_walker((Node *) query->windowClause, context);
1619                 find_expr_references_walker((Node *) query->distinctClause, context);
1620
1621                 /* Examine substructure of query */
1622                 context->rtables = lcons(query->rtable, context->rtables);
1623                 result = query_tree_walker(query,
1624                                                                    find_expr_references_walker,
1625                                                                    (void *) context,
1626                                                                    QTW_IGNORE_JOINALIASES);
1627                 context->rtables = list_delete_first(context->rtables);
1628                 return result;
1629         }
1630         else if (IsA(node, SetOperationStmt))
1631         {
1632                 SetOperationStmt *setop = (SetOperationStmt *) node;
1633
1634                 /* we need to look at the groupClauses for operator references */
1635                 find_expr_references_walker((Node *) setop->groupClauses, context);
1636                 /* fall through to examine child nodes */
1637         }
1638
1639         return expression_tree_walker(node, find_expr_references_walker,
1640                                                                   (void *) context);
1641 }
1642
1643 /*
1644  * Given an array of dependency references, eliminate any duplicates.
1645  */
1646 static void
1647 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
1648 {
1649         ObjectAddress *priorobj;
1650         int                     oldref,
1651                                 newrefs;
1652
1653         /*
1654          * We can't sort if the array has "extra" data, because there's no way
1655          * to keep it in sync.  Fortunately that combination of features is
1656          * not needed.
1657          */
1658         Assert(!addrs->extras);
1659
1660         if (addrs->numrefs <= 1)
1661                 return;                                 /* nothing to do */
1662
1663         /* Sort the refs so that duplicates are adjacent */
1664         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
1665                   object_address_comparator);
1666
1667         /* Remove dups */
1668         priorobj = addrs->refs;
1669         newrefs = 1;
1670         for (oldref = 1; oldref < addrs->numrefs; oldref++)
1671         {
1672                 ObjectAddress *thisobj = addrs->refs + oldref;
1673
1674                 if (priorobj->classId == thisobj->classId &&
1675                         priorobj->objectId == thisobj->objectId)
1676                 {
1677                         if (priorobj->objectSubId == thisobj->objectSubId)
1678                                 continue;               /* identical, so drop thisobj */
1679
1680                         /*
1681                          * If we have a whole-object reference and a reference to a part
1682                          * of the same object, we don't need the whole-object reference
1683                          * (for example, we don't need to reference both table foo and
1684                          * column foo.bar).  The whole-object reference will always appear
1685                          * first in the sorted list.
1686                          */
1687                         if (priorobj->objectSubId == 0)
1688                         {
1689                                 /* replace whole ref with partial */
1690                                 priorobj->objectSubId = thisobj->objectSubId;
1691                                 continue;
1692                         }
1693                 }
1694                 /* Not identical, so add thisobj to output set */
1695                 priorobj++;
1696                 *priorobj = *thisobj;
1697                 newrefs++;
1698         }
1699
1700         addrs->numrefs = newrefs;
1701 }
1702
1703 /*
1704  * qsort comparator for ObjectAddress items
1705  */
1706 static int
1707 object_address_comparator(const void *a, const void *b)
1708 {
1709         const ObjectAddress *obja = (const ObjectAddress *) a;
1710         const ObjectAddress *objb = (const ObjectAddress *) b;
1711
1712         if (obja->classId < objb->classId)
1713                 return -1;
1714         if (obja->classId > objb->classId)
1715                 return 1;
1716         if (obja->objectId < objb->objectId)
1717                 return -1;
1718         if (obja->objectId > objb->objectId)
1719                 return 1;
1720
1721         /*
1722          * We sort the subId as an unsigned int so that 0 will come first. See
1723          * logic in eliminate_duplicate_dependencies.
1724          */
1725         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
1726                 return -1;
1727         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
1728                 return 1;
1729         return 0;
1730 }
1731
1732 /*
1733  * Routines for handling an expansible array of ObjectAddress items.
1734  *
1735  * new_object_addresses: create a new ObjectAddresses array.
1736  */
1737 ObjectAddresses *
1738 new_object_addresses(void)
1739 {
1740         ObjectAddresses *addrs;
1741
1742         addrs = palloc(sizeof(ObjectAddresses));
1743
1744         addrs->numrefs = 0;
1745         addrs->maxrefs = 32;
1746         addrs->refs = (ObjectAddress *)
1747                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
1748         addrs->extras = NULL;           /* until/unless needed */
1749
1750         return addrs;
1751 }
1752
1753 /*
1754  * Add an entry to an ObjectAddresses array.
1755  *
1756  * It is convenient to specify the class by ObjectClass rather than directly
1757  * by catalog OID.
1758  */
1759 static void
1760 add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
1761                                    ObjectAddresses *addrs)
1762 {
1763         ObjectAddress *item;
1764
1765         /* enlarge array if needed */
1766         if (addrs->numrefs >= addrs->maxrefs)
1767         {
1768                 addrs->maxrefs *= 2;
1769                 addrs->refs = (ObjectAddress *)
1770                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1771                 Assert(!addrs->extras);
1772         }
1773         /* record this item */
1774         item = addrs->refs + addrs->numrefs;
1775         item->classId = object_classes[oclass];
1776         item->objectId = objectId;
1777         item->objectSubId = subId;
1778         addrs->numrefs++;
1779 }
1780
1781 /*
1782  * Add an entry to an ObjectAddresses array.
1783  *
1784  * As above, but specify entry exactly.
1785  */
1786 void
1787 add_exact_object_address(const ObjectAddress *object,
1788                                                  ObjectAddresses *addrs)
1789 {
1790         ObjectAddress *item;
1791
1792         /* enlarge array if needed */
1793         if (addrs->numrefs >= addrs->maxrefs)
1794         {
1795                 addrs->maxrefs *= 2;
1796                 addrs->refs = (ObjectAddress *)
1797                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1798                 Assert(!addrs->extras);
1799         }
1800         /* record this item */
1801         item = addrs->refs + addrs->numrefs;
1802         *item = *object;
1803         addrs->numrefs++;
1804 }
1805
1806 /*
1807  * Add an entry to an ObjectAddresses array.
1808  *
1809  * As above, but specify entry exactly and provide some "extra" data too.
1810  */
1811 static void
1812 add_exact_object_address_extra(const ObjectAddress *object,
1813                                                            const ObjectAddressExtra *extra,
1814                                                            ObjectAddresses *addrs)
1815 {
1816         ObjectAddress *item;
1817         ObjectAddressExtra *itemextra;
1818
1819         /* allocate extra space if first time */
1820         if (!addrs->extras)
1821                 addrs->extras = (ObjectAddressExtra *)
1822                         palloc(addrs->maxrefs * sizeof(ObjectAddressExtra));
1823
1824         /* enlarge array if needed */
1825         if (addrs->numrefs >= addrs->maxrefs)
1826         {
1827                 addrs->maxrefs *= 2;
1828                 addrs->refs = (ObjectAddress *)
1829                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1830                 addrs->extras = (ObjectAddressExtra *)
1831                   repalloc(addrs->extras, addrs->maxrefs * sizeof(ObjectAddressExtra));
1832         }
1833         /* record this item */
1834         item = addrs->refs + addrs->numrefs;
1835         *item = *object;
1836         itemextra = addrs->extras + addrs->numrefs;
1837         *itemextra = *extra;
1838         addrs->numrefs++;
1839 }
1840
1841 /*
1842  * Test whether an object is present in an ObjectAddresses array.
1843  *
1844  * We return "true" if object is a subobject of something in the array, too.
1845  */
1846 bool
1847 object_address_present(const ObjectAddress *object,
1848                                            const ObjectAddresses *addrs)
1849 {
1850         int                     i;
1851
1852         for (i = addrs->numrefs - 1; i >= 0; i--)
1853         {
1854                 const ObjectAddress *thisobj = addrs->refs + i;
1855
1856                 if (object->classId == thisobj->classId &&
1857                         object->objectId == thisobj->objectId)
1858                 {
1859                         if (object->objectSubId == thisobj->objectSubId ||
1860                                 thisobj->objectSubId == 0)
1861                                 return true;
1862                 }
1863         }
1864
1865         return false;
1866 }
1867
1868 /*
1869  * As above, except that if the object is present then also OR the given
1870  * flags into its associated extra data (which must exist).
1871  */
1872 static bool
1873 object_address_present_add_flags(const ObjectAddress *object,
1874                                                                  int flags,
1875                                                                  ObjectAddresses *addrs)
1876 {
1877         int                     i;
1878
1879         for (i = addrs->numrefs - 1; i >= 0; i--)
1880         {
1881                 ObjectAddress *thisobj = addrs->refs + i;
1882
1883                 if (object->classId == thisobj->classId &&
1884                         object->objectId == thisobj->objectId)
1885                 {
1886                         if (object->objectSubId == thisobj->objectSubId)
1887                         {
1888                                 ObjectAddressExtra *thisextra = addrs->extras + i;
1889
1890                                 thisextra->flags |= flags;
1891                                 return true;
1892                         }
1893                         if (thisobj->objectSubId == 0)
1894                         {
1895                                 /*
1896                                  * We get here if we find a need to delete a column after
1897                                  * having already decided to drop its whole table.  Obviously
1898                                  * we no longer need to drop the column.  But don't plaster
1899                                  * its flags on the table.
1900                                  */
1901                                 return true;
1902                         }
1903                 }
1904         }
1905
1906         return false;
1907 }
1908
1909 /*
1910  * Record multiple dependencies from an ObjectAddresses array, after first
1911  * removing any duplicates.
1912  */
1913 void
1914 record_object_address_dependencies(const ObjectAddress *depender,
1915                                                                    ObjectAddresses *referenced,
1916                                                                    DependencyType behavior)
1917 {
1918         eliminate_duplicate_dependencies(referenced);
1919         recordMultipleDependencies(depender,
1920                                                            referenced->refs, referenced->numrefs,
1921                                                            behavior);
1922 }
1923
1924 /*
1925  * Clean up when done with an ObjectAddresses array.
1926  */
1927 void
1928 free_object_addresses(ObjectAddresses *addrs)
1929 {
1930         pfree(addrs->refs);
1931         if (addrs->extras)
1932                 pfree(addrs->extras);
1933         pfree(addrs);
1934 }
1935
1936 /*
1937  * Determine the class of a given object identified by objectAddress.
1938  *
1939  * This function is essentially the reverse mapping for the object_classes[]
1940  * table.  We implement it as a function because the OIDs aren't consecutive.
1941  */
1942 ObjectClass
1943 getObjectClass(const ObjectAddress *object)
1944 {
1945         switch (object->classId)
1946         {
1947                 case RelationRelationId:
1948                         /* caller must check objectSubId */
1949                         return OCLASS_CLASS;
1950
1951                 case ProcedureRelationId:
1952                         Assert(object->objectSubId == 0);
1953                         return OCLASS_PROC;
1954
1955                 case TypeRelationId:
1956                         Assert(object->objectSubId == 0);
1957                         return OCLASS_TYPE;
1958
1959                 case CastRelationId:
1960                         Assert(object->objectSubId == 0);
1961                         return OCLASS_CAST;
1962
1963                 case ConstraintRelationId:
1964                         Assert(object->objectSubId == 0);
1965                         return OCLASS_CONSTRAINT;
1966
1967                 case ConversionRelationId:
1968                         Assert(object->objectSubId == 0);
1969                         return OCLASS_CONVERSION;
1970
1971                 case AttrDefaultRelationId:
1972                         Assert(object->objectSubId == 0);
1973                         return OCLASS_DEFAULT;
1974
1975                 case LanguageRelationId:
1976                         Assert(object->objectSubId == 0);
1977                         return OCLASS_LANGUAGE;
1978
1979                 case OperatorRelationId:
1980                         Assert(object->objectSubId == 0);
1981                         return OCLASS_OPERATOR;
1982
1983                 case OperatorClassRelationId:
1984                         Assert(object->objectSubId == 0);
1985                         return OCLASS_OPCLASS;
1986
1987                 case OperatorFamilyRelationId:
1988                         Assert(object->objectSubId == 0);
1989                         return OCLASS_OPFAMILY;
1990
1991                 case AccessMethodOperatorRelationId:
1992                         Assert(object->objectSubId == 0);
1993                         return OCLASS_AMOP;
1994
1995                 case AccessMethodProcedureRelationId:
1996                         Assert(object->objectSubId == 0);
1997                         return OCLASS_AMPROC;
1998
1999                 case RewriteRelationId:
2000                         Assert(object->objectSubId == 0);
2001                         return OCLASS_REWRITE;
2002
2003                 case TriggerRelationId:
2004                         Assert(object->objectSubId == 0);
2005                         return OCLASS_TRIGGER;
2006
2007                 case NamespaceRelationId:
2008                         Assert(object->objectSubId == 0);
2009                         return OCLASS_SCHEMA;
2010
2011                 case TSParserRelationId:
2012                         Assert(object->objectSubId == 0);
2013                         return OCLASS_TSPARSER;
2014
2015                 case TSDictionaryRelationId:
2016                         Assert(object->objectSubId == 0);
2017                         return OCLASS_TSDICT;
2018
2019                 case TSTemplateRelationId:
2020                         Assert(object->objectSubId == 0);
2021                         return OCLASS_TSTEMPLATE;
2022
2023                 case TSConfigRelationId:
2024                         Assert(object->objectSubId == 0);
2025                         return OCLASS_TSCONFIG;
2026
2027                 case AuthIdRelationId:
2028                         Assert(object->objectSubId == 0);
2029                         return OCLASS_ROLE;
2030
2031                 case DatabaseRelationId:
2032                         Assert(object->objectSubId == 0);
2033                         return OCLASS_DATABASE;
2034
2035                 case TableSpaceRelationId:
2036                         Assert(object->objectSubId == 0);
2037                         return OCLASS_TBLSPACE;
2038
2039                 case ForeignDataWrapperRelationId:
2040                         Assert(object->objectSubId == 0);
2041                         return OCLASS_FDW;
2042
2043                 case ForeignServerRelationId:
2044                         Assert(object->objectSubId == 0);
2045                         return OCLASS_FOREIGN_SERVER;
2046
2047                 case UserMappingRelationId:
2048                         Assert(object->objectSubId == 0);
2049                         return OCLASS_USER_MAPPING;
2050         }
2051
2052         /* shouldn't get here */
2053         elog(ERROR, "unrecognized object class: %u", object->classId);
2054         return OCLASS_CLASS;            /* keep compiler quiet */
2055 }
2056
2057 /*
2058  * getObjectDescription: build an object description for messages
2059  *
2060  * The result is a palloc'd string.
2061  */
2062 char *
2063 getObjectDescription(const ObjectAddress *object)
2064 {
2065         StringInfoData buffer;
2066
2067         initStringInfo(&buffer);
2068
2069         switch (getObjectClass(object))
2070         {
2071                 case OCLASS_CLASS:
2072                         getRelationDescription(&buffer, object->objectId);
2073                         if (object->objectSubId != 0)
2074                                 appendStringInfo(&buffer, _(" column %s"),
2075                                                                  get_relid_attribute_name(object->objectId,
2076                                                                                                            object->objectSubId));
2077                         break;
2078
2079                 case OCLASS_PROC:
2080                         appendStringInfo(&buffer, _("function %s"),
2081                                                          format_procedure(object->objectId));
2082                         break;
2083
2084                 case OCLASS_TYPE:
2085                         appendStringInfo(&buffer, _("type %s"),
2086                                                          format_type_be(object->objectId));
2087                         break;
2088
2089                 case OCLASS_CAST:
2090                         {
2091                                 Relation        castDesc;
2092                                 ScanKeyData skey[1];
2093                                 SysScanDesc rcscan;
2094                                 HeapTuple       tup;
2095                                 Form_pg_cast castForm;
2096
2097                                 castDesc = heap_open(CastRelationId, AccessShareLock);
2098
2099                                 ScanKeyInit(&skey[0],
2100                                                         ObjectIdAttributeNumber,
2101                                                         BTEqualStrategyNumber, F_OIDEQ,
2102                                                         ObjectIdGetDatum(object->objectId));
2103
2104                                 rcscan = systable_beginscan(castDesc, CastOidIndexId, true,
2105                                                                                         SnapshotNow, 1, skey);
2106
2107                                 tup = systable_getnext(rcscan);
2108
2109                                 if (!HeapTupleIsValid(tup))
2110                                         elog(ERROR, "could not find tuple for cast %u",
2111                                                  object->objectId);
2112
2113                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
2114
2115                                 appendStringInfo(&buffer, _("cast from %s to %s"),
2116                                                                  format_type_be(castForm->castsource),
2117                                                                  format_type_be(castForm->casttarget));
2118
2119                                 systable_endscan(rcscan);
2120                                 heap_close(castDesc, AccessShareLock);
2121                                 break;
2122                         }
2123
2124                 case OCLASS_CONSTRAINT:
2125                         {
2126                                 HeapTuple       conTup;
2127                                 Form_pg_constraint con;
2128
2129                                 conTup = SearchSysCache(CONSTROID,
2130                                                                                 ObjectIdGetDatum(object->objectId),
2131                                                                                 0, 0, 0);
2132                                 if (!HeapTupleIsValid(conTup))
2133                                         elog(ERROR, "cache lookup failed for constraint %u",
2134                                                  object->objectId);
2135                                 con = (Form_pg_constraint) GETSTRUCT(conTup);
2136
2137                                 if (OidIsValid(con->conrelid))
2138                                 {
2139                                         StringInfoData  rel;
2140
2141                                         initStringInfo(&rel);
2142                                         getRelationDescription(&rel, con->conrelid);
2143                                         appendStringInfo(&buffer, _("constraint %s on %s"),
2144                                                                          NameStr(con->conname), rel.data);
2145                                         pfree(rel.data);
2146                                 }
2147                                 else
2148                                 {
2149                                         appendStringInfo(&buffer, _("constraint %s"),
2150                                                                          NameStr(con->conname));
2151                                 }
2152
2153                                 ReleaseSysCache(conTup);
2154                                 break;
2155                         }
2156
2157                 case OCLASS_CONVERSION:
2158                         {
2159                                 HeapTuple       conTup;
2160
2161                                 conTup = SearchSysCache(CONVOID,
2162                                                                                 ObjectIdGetDatum(object->objectId),
2163                                                                                 0, 0, 0);
2164                                 if (!HeapTupleIsValid(conTup))
2165                                         elog(ERROR, "cache lookup failed for conversion %u",
2166                                                  object->objectId);
2167                                 appendStringInfo(&buffer, _("conversion %s"),
2168                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
2169                                 ReleaseSysCache(conTup);
2170                                 break;
2171                         }
2172
2173                 case OCLASS_DEFAULT:
2174                         {
2175                                 Relation        attrdefDesc;
2176                                 ScanKeyData skey[1];
2177                                 SysScanDesc adscan;
2178                                 HeapTuple       tup;
2179                                 Form_pg_attrdef attrdef;
2180                                 ObjectAddress colobject;
2181
2182                                 attrdefDesc = heap_open(AttrDefaultRelationId, AccessShareLock);
2183
2184                                 ScanKeyInit(&skey[0],
2185                                                         ObjectIdAttributeNumber,
2186                                                         BTEqualStrategyNumber, F_OIDEQ,
2187                                                         ObjectIdGetDatum(object->objectId));
2188
2189                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndexId,
2190                                                                                         true, SnapshotNow, 1, skey);
2191
2192                                 tup = systable_getnext(adscan);
2193
2194                                 if (!HeapTupleIsValid(tup))
2195                                         elog(ERROR, "could not find tuple for attrdef %u",
2196                                                  object->objectId);
2197
2198                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
2199
2200                                 colobject.classId = RelationRelationId;
2201                                 colobject.objectId = attrdef->adrelid;
2202                                 colobject.objectSubId = attrdef->adnum;
2203
2204                                 appendStringInfo(&buffer, _("default for %s"),
2205                                                                  getObjectDescription(&colobject));
2206
2207                                 systable_endscan(adscan);
2208                                 heap_close(attrdefDesc, AccessShareLock);
2209                                 break;
2210                         }
2211
2212                 case OCLASS_LANGUAGE:
2213                         {
2214                                 HeapTuple       langTup;
2215
2216                                 langTup = SearchSysCache(LANGOID,
2217                                                                                  ObjectIdGetDatum(object->objectId),
2218                                                                                  0, 0, 0);
2219                                 if (!HeapTupleIsValid(langTup))
2220                                         elog(ERROR, "cache lookup failed for language %u",
2221                                                  object->objectId);
2222                                 appendStringInfo(&buffer, _("language %s"),
2223                                   NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
2224                                 ReleaseSysCache(langTup);
2225                                 break;
2226                         }
2227
2228                 case OCLASS_OPERATOR:
2229                         appendStringInfo(&buffer, _("operator %s"),
2230                                                          format_operator(object->objectId));
2231                         break;
2232
2233                 case OCLASS_OPCLASS:
2234                         {
2235                                 HeapTuple       opcTup;
2236                                 Form_pg_opclass opcForm;
2237                                 HeapTuple       amTup;
2238                                 Form_pg_am      amForm;
2239                                 char       *nspname;
2240
2241                                 opcTup = SearchSysCache(CLAOID,
2242                                                                                 ObjectIdGetDatum(object->objectId),
2243                                                                                 0, 0, 0);
2244                                 if (!HeapTupleIsValid(opcTup))
2245                                         elog(ERROR, "cache lookup failed for opclass %u",
2246                                                  object->objectId);
2247                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
2248
2249                                 amTup = SearchSysCache(AMOID,
2250                                                                            ObjectIdGetDatum(opcForm->opcmethod),
2251                                                                            0, 0, 0);
2252                                 if (!HeapTupleIsValid(amTup))
2253                                         elog(ERROR, "cache lookup failed for access method %u",
2254                                                  opcForm->opcmethod);
2255                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
2256
2257                                 /* Qualify the name if not visible in search path */
2258                                 if (OpclassIsVisible(object->objectId))
2259                                         nspname = NULL;
2260                                 else
2261                                         nspname = get_namespace_name(opcForm->opcnamespace);
2262
2263                                 appendStringInfo(&buffer, _("operator class %s for access method %s"),
2264                                                                  quote_qualified_identifier(nspname,
2265                                                                                                   NameStr(opcForm->opcname)),
2266                                                                  NameStr(amForm->amname));
2267
2268                                 ReleaseSysCache(amTup);
2269                                 ReleaseSysCache(opcTup);
2270                                 break;
2271                         }
2272
2273                 case OCLASS_OPFAMILY:
2274                         getOpFamilyDescription(&buffer, object->objectId);
2275                         break;
2276
2277                 case OCLASS_AMOP:
2278                         {
2279                                 Relation        amopDesc;
2280                                 ScanKeyData skey[1];
2281                                 SysScanDesc amscan;
2282                                 HeapTuple       tup;
2283                                 Form_pg_amop amopForm;
2284                                 StringInfoData opfam;
2285
2286                                 amopDesc = heap_open(AccessMethodOperatorRelationId,
2287                                                                          AccessShareLock);
2288
2289                                 ScanKeyInit(&skey[0],
2290                                                         ObjectIdAttributeNumber,
2291                                                         BTEqualStrategyNumber, F_OIDEQ,
2292                                                         ObjectIdGetDatum(object->objectId));
2293
2294                                 amscan = systable_beginscan(amopDesc, AccessMethodOperatorOidIndexId, true,
2295                                                                                         SnapshotNow, 1, skey);
2296
2297                                 tup = systable_getnext(amscan);
2298
2299                                 if (!HeapTupleIsValid(tup))
2300                                         elog(ERROR, "could not find tuple for amop entry %u",
2301                                                  object->objectId);
2302
2303                                 amopForm = (Form_pg_amop) GETSTRUCT(tup);
2304
2305                                 initStringInfo(&opfam);
2306                                 getOpFamilyDescription(&opfam, amopForm->amopfamily);
2307                                 /*
2308                                  * translator: %d is the operator strategy (a number), the
2309                                  * first %s is the textual form of the operator, and the second
2310                                  * %s is the description of the operator family.
2311                                  */
2312                                 appendStringInfo(&buffer, _("operator %d %s of %s"),
2313                                                                  amopForm->amopstrategy,
2314                                                                  format_operator(amopForm->amopopr),
2315                                                                  opfam.data);
2316                                 pfree(opfam.data);
2317
2318                                 systable_endscan(amscan);
2319                                 heap_close(amopDesc, AccessShareLock);
2320                                 break;
2321                         }
2322
2323                 case OCLASS_AMPROC:
2324                         {
2325                                 Relation        amprocDesc;
2326                                 ScanKeyData skey[1];
2327                                 SysScanDesc amscan;
2328                                 HeapTuple       tup;
2329                                 Form_pg_amproc amprocForm;
2330                                 StringInfoData opfam;
2331
2332                                 amprocDesc = heap_open(AccessMethodProcedureRelationId,
2333                                                                            AccessShareLock);
2334
2335                                 ScanKeyInit(&skey[0],
2336                                                         ObjectIdAttributeNumber,
2337                                                         BTEqualStrategyNumber, F_OIDEQ,
2338                                                         ObjectIdGetDatum(object->objectId));
2339
2340                                 amscan = systable_beginscan(amprocDesc, AccessMethodProcedureOidIndexId, true,
2341                                                                                         SnapshotNow, 1, skey);
2342
2343                                 tup = systable_getnext(amscan);
2344
2345                                 if (!HeapTupleIsValid(tup))
2346                                         elog(ERROR, "could not find tuple for amproc entry %u",
2347                                                  object->objectId);
2348
2349                                 amprocForm = (Form_pg_amproc) GETSTRUCT(tup);
2350
2351                                 initStringInfo(&opfam);
2352                                 getOpFamilyDescription(&opfam, amprocForm->amprocfamily);
2353                                 /*
2354                                  * translator: %d is the function number, the first %s is the
2355                                  * textual form of the function with arguments, and the second
2356                                  * %s is the description of the operator family.
2357                                  */
2358                                 appendStringInfo(&buffer, _("function %d %s of %s"),
2359                                                                  amprocForm->amprocnum,
2360                                                                  format_procedure(amprocForm->amproc),
2361                                                                  opfam.data);
2362                                 pfree(opfam.data);
2363
2364                                 systable_endscan(amscan);
2365                                 heap_close(amprocDesc, AccessShareLock);
2366                                 break;
2367                         }
2368
2369                 case OCLASS_REWRITE:
2370                         {
2371                                 Relation        ruleDesc;
2372                                 ScanKeyData skey[1];
2373                                 SysScanDesc rcscan;
2374                                 HeapTuple       tup;
2375                                 Form_pg_rewrite rule;
2376
2377                                 ruleDesc = heap_open(RewriteRelationId, AccessShareLock);
2378
2379                                 ScanKeyInit(&skey[0],
2380                                                         ObjectIdAttributeNumber,
2381                                                         BTEqualStrategyNumber, F_OIDEQ,
2382                                                         ObjectIdGetDatum(object->objectId));
2383
2384                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndexId, true,
2385                                                                                         SnapshotNow, 1, skey);
2386
2387                                 tup = systable_getnext(rcscan);
2388
2389                                 if (!HeapTupleIsValid(tup))
2390                                         elog(ERROR, "could not find tuple for rule %u",
2391                                                  object->objectId);
2392
2393                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
2394
2395                                 appendStringInfo(&buffer, _("rule %s on "),
2396                                                                  NameStr(rule->rulename));
2397                                 getRelationDescription(&buffer, rule->ev_class);
2398
2399                                 systable_endscan(rcscan);
2400                                 heap_close(ruleDesc, AccessShareLock);
2401                                 break;
2402                         }
2403
2404                 case OCLASS_TRIGGER:
2405                         {
2406                                 Relation        trigDesc;
2407                                 ScanKeyData skey[1];
2408                                 SysScanDesc tgscan;
2409                                 HeapTuple       tup;
2410                                 Form_pg_trigger trig;
2411
2412                                 trigDesc = heap_open(TriggerRelationId, AccessShareLock);
2413
2414                                 ScanKeyInit(&skey[0],
2415                                                         ObjectIdAttributeNumber,
2416                                                         BTEqualStrategyNumber, F_OIDEQ,
2417                                                         ObjectIdGetDatum(object->objectId));
2418
2419                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndexId, true,
2420                                                                                         SnapshotNow, 1, skey);
2421
2422                                 tup = systable_getnext(tgscan);
2423
2424                                 if (!HeapTupleIsValid(tup))
2425                                         elog(ERROR, "could not find tuple for trigger %u",
2426                                                  object->objectId);
2427
2428                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
2429
2430                                 appendStringInfo(&buffer, _("trigger %s on "),
2431                                                                  NameStr(trig->tgname));
2432                                 getRelationDescription(&buffer, trig->tgrelid);
2433
2434                                 systable_endscan(tgscan);
2435                                 heap_close(trigDesc, AccessShareLock);
2436                                 break;
2437                         }
2438
2439                 case OCLASS_SCHEMA:
2440                         {
2441                                 char       *nspname;
2442
2443                                 nspname = get_namespace_name(object->objectId);
2444                                 if (!nspname)
2445                                         elog(ERROR, "cache lookup failed for namespace %u",
2446                                                  object->objectId);
2447                                 appendStringInfo(&buffer, _("schema %s"), nspname);
2448                                 break;
2449                         }
2450
2451                 case OCLASS_TSPARSER:
2452                         {
2453                                 HeapTuple       tup;
2454
2455                                 tup = SearchSysCache(TSPARSEROID,
2456                                                                          ObjectIdGetDatum(object->objectId),
2457                                                                          0, 0, 0);
2458                                 if (!HeapTupleIsValid(tup))
2459                                         elog(ERROR, "cache lookup failed for text search parser %u",
2460                                                  object->objectId);
2461                                 appendStringInfo(&buffer, _("text search parser %s"),
2462                                          NameStr(((Form_pg_ts_parser) GETSTRUCT(tup))->prsname));
2463                                 ReleaseSysCache(tup);
2464                                 break;
2465                         }
2466
2467                 case OCLASS_TSDICT:
2468                         {
2469                                 HeapTuple       tup;
2470
2471                                 tup = SearchSysCache(TSDICTOID,
2472                                                                          ObjectIdGetDatum(object->objectId),
2473                                                                          0, 0, 0);
2474                                 if (!HeapTupleIsValid(tup))
2475                                         elog(ERROR, "cache lookup failed for text search dictionary %u",
2476                                                  object->objectId);
2477                                 appendStringInfo(&buffer, _("text search dictionary %s"),
2478                                           NameStr(((Form_pg_ts_dict) GETSTRUCT(tup))->dictname));
2479                                 ReleaseSysCache(tup);
2480                                 break;
2481                         }
2482
2483                 case OCLASS_TSTEMPLATE:
2484                         {
2485                                 HeapTuple       tup;
2486
2487                                 tup = SearchSysCache(TSTEMPLATEOID,
2488                                                                          ObjectIdGetDatum(object->objectId),
2489                                                                          0, 0, 0);
2490                                 if (!HeapTupleIsValid(tup))
2491                                         elog(ERROR, "cache lookup failed for text search template %u",
2492                                                  object->objectId);
2493                                 appendStringInfo(&buffer, _("text search template %s"),
2494                                   NameStr(((Form_pg_ts_template) GETSTRUCT(tup))->tmplname));
2495                                 ReleaseSysCache(tup);
2496                                 break;
2497                         }
2498
2499                 case OCLASS_TSCONFIG:
2500                         {
2501                                 HeapTuple       tup;
2502
2503                                 tup = SearchSysCache(TSCONFIGOID,
2504                                                                          ObjectIdGetDatum(object->objectId),
2505                                                                          0, 0, 0);
2506                                 if (!HeapTupleIsValid(tup))
2507                                         elog(ERROR, "cache lookup failed for text search configuration %u",
2508                                                  object->objectId);
2509                                 appendStringInfo(&buffer, _("text search configuration %s"),
2510                                          NameStr(((Form_pg_ts_config) GETSTRUCT(tup))->cfgname));
2511                                 ReleaseSysCache(tup);
2512                                 break;
2513                         }
2514
2515                 case OCLASS_ROLE:
2516                         {
2517                                 appendStringInfo(&buffer, _("role %s"),
2518                                                                  GetUserNameFromId(object->objectId));
2519                                 break;
2520                         }
2521
2522                 case OCLASS_DATABASE:
2523                         {
2524                                 char       *datname;
2525
2526                                 datname = get_database_name(object->objectId);
2527                                 if (!datname)
2528                                         elog(ERROR, "cache lookup failed for database %u",
2529                                                  object->objectId);
2530                                 appendStringInfo(&buffer, _("database %s"), datname);
2531                                 break;
2532                         }
2533
2534                 case OCLASS_TBLSPACE:
2535                         {
2536                                 char       *tblspace;
2537
2538                                 tblspace = get_tablespace_name(object->objectId);
2539                                 if (!tblspace)
2540                                         elog(ERROR, "cache lookup failed for tablespace %u",
2541                                                  object->objectId);
2542                                 appendStringInfo(&buffer, _("tablespace %s"), tblspace);
2543                                 break;
2544                         }
2545
2546                 case OCLASS_FDW:
2547                         {
2548                                 ForeignDataWrapper *fdw;
2549
2550                                 fdw = GetForeignDataWrapper(object->objectId);
2551                                 appendStringInfo(&buffer, _("foreign-data wrapper %s"), fdw->fdwname);
2552                                 break;
2553                         }
2554
2555                 case OCLASS_FOREIGN_SERVER:
2556                         {
2557                                 ForeignServer *srv;
2558
2559                                 srv = GetForeignServer(object->objectId);
2560                                 appendStringInfo(&buffer, _("server %s"), srv->servername);
2561                                 break;
2562                         }
2563
2564                 case OCLASS_USER_MAPPING:
2565                         {
2566                                 HeapTuple               tup;
2567                                 Oid                             useid;
2568                                 char               *usename;
2569
2570                                 tup = SearchSysCache(USERMAPPINGOID,
2571                                                                          ObjectIdGetDatum(object->objectId),
2572                                                                          0, 0, 0);
2573                                 if (!HeapTupleIsValid(tup))
2574                                         elog(ERROR, "cache lookup failed for user mapping %u",
2575                                                  object->objectId);
2576
2577                                 useid = ((Form_pg_user_mapping) GETSTRUCT(tup))->umuser;
2578
2579                                 ReleaseSysCache(tup);
2580
2581                                 if (OidIsValid(useid))
2582                                         usename = GetUserNameFromId(useid);
2583                                 else
2584                                         usename = "public";
2585
2586                                 appendStringInfo(&buffer, _("user mapping for %s"), usename);
2587                                 break;
2588                         }
2589
2590                 default:
2591                         appendStringInfo(&buffer, "unrecognized object %u %u %d",
2592                                                          object->classId,
2593                                                          object->objectId,
2594                                                          object->objectSubId);
2595                         break;
2596         }
2597
2598         return buffer.data;
2599 }
2600
2601 /*
2602  * subroutine for getObjectDescription: describe a relation
2603  */
2604 static void
2605 getRelationDescription(StringInfo buffer, Oid relid)
2606 {
2607         HeapTuple       relTup;
2608         Form_pg_class relForm;
2609         char       *nspname;
2610         char       *relname;
2611
2612         relTup = SearchSysCache(RELOID,
2613                                                         ObjectIdGetDatum(relid),
2614                                                         0, 0, 0);
2615         if (!HeapTupleIsValid(relTup))
2616                 elog(ERROR, "cache lookup failed for relation %u", relid);
2617         relForm = (Form_pg_class) GETSTRUCT(relTup);
2618
2619         /* Qualify the name if not visible in search path */
2620         if (RelationIsVisible(relid))
2621                 nspname = NULL;
2622         else
2623                 nspname = get_namespace_name(relForm->relnamespace);
2624
2625         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
2626
2627         switch (relForm->relkind)
2628         {
2629                 case RELKIND_RELATION:
2630                         appendStringInfo(buffer, _("table %s"),
2631                                                          relname);
2632                         break;
2633                 case RELKIND_INDEX:
2634                         appendStringInfo(buffer, _("index %s"),
2635                                                          relname);
2636                         break;
2637                 case RELKIND_SEQUENCE:
2638                         appendStringInfo(buffer, _("sequence %s"),
2639                                                          relname);
2640                         break;
2641                 case RELKIND_UNCATALOGED:
2642                         appendStringInfo(buffer, _("uncataloged table %s"),
2643                                                          relname);
2644                         break;
2645                 case RELKIND_TOASTVALUE:
2646                         appendStringInfo(buffer, _("toast table %s"),
2647                                                          relname);
2648                         break;
2649                 case RELKIND_VIEW:
2650                         appendStringInfo(buffer, _("view %s"),
2651                                                          relname);
2652                         break;
2653                 case RELKIND_COMPOSITE_TYPE:
2654                         appendStringInfo(buffer, _("composite type %s"),
2655                                                          relname);
2656                         break;
2657                 default:
2658                         /* shouldn't get here */
2659                         appendStringInfo(buffer, _("relation %s"),
2660                                                          relname);
2661                         break;
2662         }
2663
2664         ReleaseSysCache(relTup);
2665 }
2666
2667 /*
2668  * subroutine for getObjectDescription: describe an operator family
2669  */
2670 static void
2671 getOpFamilyDescription(StringInfo buffer, Oid opfid)
2672 {
2673         HeapTuple       opfTup;
2674         Form_pg_opfamily opfForm;
2675         HeapTuple       amTup;
2676         Form_pg_am      amForm;
2677         char       *nspname;
2678
2679         opfTup = SearchSysCache(OPFAMILYOID,
2680                                                         ObjectIdGetDatum(opfid),
2681                                                         0, 0, 0);
2682         if (!HeapTupleIsValid(opfTup))
2683                 elog(ERROR, "cache lookup failed for opfamily %u", opfid);
2684         opfForm = (Form_pg_opfamily) GETSTRUCT(opfTup);
2685
2686         amTup = SearchSysCache(AMOID,
2687                                                    ObjectIdGetDatum(opfForm->opfmethod),
2688                                                    0, 0, 0);
2689         if (!HeapTupleIsValid(amTup))
2690                 elog(ERROR, "cache lookup failed for access method %u",
2691                          opfForm->opfmethod);
2692         amForm = (Form_pg_am) GETSTRUCT(amTup);
2693
2694         /* Qualify the name if not visible in search path */
2695         if (OpfamilyIsVisible(opfid))
2696                 nspname = NULL;
2697         else
2698                 nspname = get_namespace_name(opfForm->opfnamespace);
2699
2700         appendStringInfo(buffer, _("operator family %s for access method %s"),
2701                                          quote_qualified_identifier(nspname,
2702                                                                                                 NameStr(opfForm->opfname)),
2703                                          NameStr(amForm->amname));
2704
2705         ReleaseSysCache(amTup);
2706         ReleaseSysCache(opfTup);
2707 }