]> granicus.if.org Git - postgresql/blob - src/backend/catalog/dependency.c
Support column-level privileges, as required by SQL standard.
[postgresql] / src / backend / catalog / dependency.c
1 /*-------------------------------------------------------------------------
2  *
3  * dependency.c
4  *        Routines to support inter-object dependencies.
5  *
6  *
7  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.86 2009/01/22 20:16:00 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/sysattr.h"
20 #include "access/xact.h"
21 #include "catalog/dependency.h"
22 #include "catalog/heap.h"
23 #include "catalog/index.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_amop.h"
27 #include "catalog/pg_amproc.h"
28 #include "catalog/pg_attrdef.h"
29 #include "catalog/pg_authid.h"
30 #include "catalog/pg_cast.h"
31 #include "catalog/pg_constraint.h"
32 #include "catalog/pg_conversion.h"
33 #include "catalog/pg_conversion_fn.h"
34 #include "catalog/pg_database.h"
35 #include "catalog/pg_depend.h"
36 #include "catalog/pg_foreign_data_wrapper.h"
37 #include "catalog/pg_foreign_server.h"
38 #include "catalog/pg_language.h"
39 #include "catalog/pg_namespace.h"
40 #include "catalog/pg_opclass.h"
41 #include "catalog/pg_operator.h"
42 #include "catalog/pg_opfamily.h"
43 #include "catalog/pg_proc.h"
44 #include "catalog/pg_rewrite.h"
45 #include "catalog/pg_tablespace.h"
46 #include "catalog/pg_trigger.h"
47 #include "catalog/pg_ts_config.h"
48 #include "catalog/pg_ts_dict.h"
49 #include "catalog/pg_ts_parser.h"
50 #include "catalog/pg_ts_template.h"
51 #include "catalog/pg_type.h"
52 #include "catalog/pg_user_mapping.h"
53 #include "commands/comment.h"
54 #include "commands/dbcommands.h"
55 #include "commands/defrem.h"
56 #include "commands/proclang.h"
57 #include "commands/schemacmds.h"
58 #include "commands/tablespace.h"
59 #include "commands/trigger.h"
60 #include "commands/typecmds.h"
61 #include "foreign/foreign.h"
62 #include "miscadmin.h"
63 #include "nodes/nodeFuncs.h"
64 #include "parser/parsetree.h"
65 #include "rewrite/rewriteRemove.h"
66 #include "storage/lmgr.h"
67 #include "utils/builtins.h"
68 #include "utils/fmgroids.h"
69 #include "utils/guc.h"
70 #include "utils/lsyscache.h"
71 #include "utils/syscache.h"
72 #include "utils/tqual.h"
73
74
75 /*
76  * Deletion processing requires additional state for each ObjectAddress that
77  * it's planning to delete.  For simplicity and code-sharing we make the
78  * ObjectAddresses code support arrays with or without this extra state.
79  */
80 typedef struct
81 {
82         int                     flags;                  /* bitmask, see bit definitions below */
83         ObjectAddress dependee;         /* object whose deletion forced this one */
84 } ObjectAddressExtra;
85
86 /* ObjectAddressExtra flag bits */
87 #define DEPFLAG_ORIGINAL        0x0001          /* an original deletion target */
88 #define DEPFLAG_NORMAL          0x0002          /* reached via normal dependency */
89 #define DEPFLAG_AUTO            0x0004          /* reached via auto dependency */
90 #define DEPFLAG_INTERNAL        0x0008          /* reached via internal dependency */
91
92
93 /* expansible list of ObjectAddresses */
94 struct ObjectAddresses
95 {
96         ObjectAddress *refs;            /* => palloc'd array */
97         ObjectAddressExtra *extras;     /* => palloc'd array, or NULL if not used */
98         int                     numrefs;                /* current number of references */
99         int                     maxrefs;                /* current size of palloc'd array(s) */
100 };
101
102 /* typedef ObjectAddresses appears in dependency.h */
103
104 /* threaded list of ObjectAddresses, for recursion detection */
105 typedef struct ObjectAddressStack
106 {
107         const ObjectAddress *object; /* object being visited */
108         int                     flags;                  /* its current flag bits */
109         struct ObjectAddressStack *next; /* next outer stack level */
110 } ObjectAddressStack;
111
112 /* for find_expr_references_walker */
113 typedef struct
114 {
115         ObjectAddresses *addrs;         /* addresses being accumulated */
116         List       *rtables;            /* list of rangetables to resolve Vars */
117 } find_expr_references_context;
118
119 /*
120  * This constant table maps ObjectClasses to the corresponding catalog OIDs.
121  * See also getObjectClass().
122  */
123 static const Oid object_classes[MAX_OCLASS] = {
124         RelationRelationId,                     /* OCLASS_CLASS */
125         ProcedureRelationId,            /* OCLASS_PROC */
126         TypeRelationId,                         /* OCLASS_TYPE */
127         CastRelationId,                         /* OCLASS_CAST */
128         ConstraintRelationId,           /* OCLASS_CONSTRAINT */
129         ConversionRelationId,           /* OCLASS_CONVERSION */
130         AttrDefaultRelationId,          /* OCLASS_DEFAULT */
131         LanguageRelationId,                     /* OCLASS_LANGUAGE */
132         OperatorRelationId,                     /* OCLASS_OPERATOR */
133         OperatorClassRelationId,        /* OCLASS_OPCLASS */
134         OperatorFamilyRelationId,       /* OCLASS_OPFAMILY */
135         AccessMethodOperatorRelationId,         /* OCLASS_AMOP */
136         AccessMethodProcedureRelationId,        /* OCLASS_AMPROC */
137         RewriteRelationId,                      /* OCLASS_REWRITE */
138         TriggerRelationId,                      /* OCLASS_TRIGGER */
139         NamespaceRelationId,            /* OCLASS_SCHEMA */
140         TSParserRelationId,                     /* OCLASS_TSPARSER */
141         TSDictionaryRelationId,         /* OCLASS_TSDICT */
142         TSTemplateRelationId,           /* OCLASS_TSTEMPLATE */
143         TSConfigRelationId,                     /* OCLASS_TSCONFIG */
144         AuthIdRelationId,                       /* OCLASS_ROLE */
145         DatabaseRelationId,                     /* OCLASS_DATABASE */
146         TableSpaceRelationId            /* OCLASS_TBLSPACE */
147 };
148
149
150 static void findDependentObjects(const ObjectAddress *object,
151                                          int flags,
152                                          ObjectAddressStack *stack,
153                                          ObjectAddresses *targetObjects,
154                                          const ObjectAddresses *pendingObjects,
155                                          Relation depRel);
156 static void reportDependentObjects(const ObjectAddresses *targetObjects,
157                                            DropBehavior behavior,
158                                            int msglevel,
159                                            const ObjectAddress *origObject);
160 static void deleteOneObject(const ObjectAddress *object, Relation depRel);
161 static void doDeletion(const ObjectAddress *object);
162 static void AcquireDeletionLock(const ObjectAddress *object);
163 static void ReleaseDeletionLock(const ObjectAddress *object);
164 static bool find_expr_references_walker(Node *node,
165                                                         find_expr_references_context *context);
166 static void eliminate_duplicate_dependencies(ObjectAddresses *addrs);
167 static int      object_address_comparator(const void *a, const void *b);
168 static void add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
169                                    ObjectAddresses *addrs);
170 static void add_exact_object_address_extra(const ObjectAddress *object,
171                                                            const ObjectAddressExtra *extra,
172                                                            ObjectAddresses *addrs);
173 static bool object_address_present_add_flags(const ObjectAddress *object,
174                                                                                          int flags,
175                                                                                          ObjectAddresses *addrs);
176 static void getRelationDescription(StringInfo buffer, Oid relid);
177 static void getOpFamilyDescription(StringInfo buffer, Oid opfid);
178
179
180 /*
181  * performDeletion: attempt to drop the specified object.  If CASCADE
182  * behavior is specified, also drop any dependent objects (recursively).
183  * If RESTRICT behavior is specified, error out if there are any dependent
184  * objects, except for those that should be implicitly dropped anyway
185  * according to the dependency type.
186  *
187  * This is the outer control routine for all forms of DROP that drop objects
188  * that can participate in dependencies.  Note that the next two routines
189  * are variants on the same theme; if you change anything here you'll likely
190  * need to fix them too.
191  */
192 void
193 performDeletion(const ObjectAddress *object,
194                                 DropBehavior behavior)
195 {
196         Relation        depRel;
197         ObjectAddresses *targetObjects;
198         int                     i;
199
200         /*
201          * We save some cycles by opening pg_depend just once and passing the
202          * Relation pointer down to all the recursive deletion steps.
203          */
204         depRel = heap_open(DependRelationId, RowExclusiveLock);
205
206         /*
207          * Acquire deletion lock on the target object.  (Ideally the caller has
208          * done this already, but many places are sloppy about it.)
209          */
210         AcquireDeletionLock(object);
211
212         /*
213          * Construct a list of objects to delete (ie, the given object plus
214          * everything directly or indirectly dependent on it).
215          */
216         targetObjects = new_object_addresses();
217
218         findDependentObjects(object,
219                                                  DEPFLAG_ORIGINAL,
220                                                  NULL,  /* empty stack */
221                                                  targetObjects,
222                                                  NULL,  /* no pendingObjects */
223                                                  depRel);
224
225         /*
226          * Check if deletion is allowed, and report about cascaded deletes.
227          */
228         reportDependentObjects(targetObjects,
229                                                    behavior,
230                                                    NOTICE,
231                                                    object);
232
233         /*
234          * Delete all the objects in the proper order.
235          */
236         for (i = 0; i < targetObjects->numrefs; i++)
237         {
238                 ObjectAddress *thisobj = targetObjects->refs + i;
239
240                 deleteOneObject(thisobj, depRel);
241         }
242
243         /* And clean up */
244         free_object_addresses(targetObjects);
245
246         heap_close(depRel, RowExclusiveLock);
247 }
248
249 /*
250  * performMultipleDeletions: Similar to performDeletion, but act on multiple
251  * objects at once.
252  *
253  * The main difference from issuing multiple performDeletion calls is that the
254  * list of objects that would be implicitly dropped, for each object to be
255  * dropped, is the union of the implicit-object list for all objects.  This
256  * makes each check be more relaxed.
257  */
258 void
259 performMultipleDeletions(const ObjectAddresses *objects,
260                                                  DropBehavior behavior)
261 {
262         Relation        depRel;
263         ObjectAddresses *targetObjects;
264         int                     i;
265
266         /* No work if no objects... */
267         if (objects->numrefs <= 0)
268                 return;
269
270         /*
271          * We save some cycles by opening pg_depend just once and passing the
272          * Relation pointer down to all the recursive deletion steps.
273          */
274         depRel = heap_open(DependRelationId, RowExclusiveLock);
275
276         /*
277          * Construct a list of objects to delete (ie, the given objects plus
278          * everything directly or indirectly dependent on them).  Note that
279          * because we pass the whole objects list as pendingObjects context,
280          * we won't get a failure from trying to delete an object that is
281          * internally dependent on another one in the list; we'll just skip
282          * that object and delete it when we reach its owner.
283          */
284         targetObjects = new_object_addresses();
285
286         for (i = 0; i < objects->numrefs; i++)
287         {
288                 const ObjectAddress *thisobj = objects->refs + i;
289
290                 /*
291                  * Acquire deletion lock on each target object.  (Ideally the caller
292                  * has done this already, but many places are sloppy about it.)
293                  */
294                 AcquireDeletionLock(thisobj);
295
296                 findDependentObjects(thisobj,
297                                                          DEPFLAG_ORIGINAL,
298                                                          NULL,  /* empty stack */
299                                                          targetObjects,
300                                                          objects,
301                                                          depRel);
302         }
303
304         /*
305          * Check if deletion is allowed, and report about cascaded deletes.
306          *
307          * If there's exactly one object being deleted, report it the same
308          * way as in performDeletion(), else we have to be vaguer.
309          */
310         reportDependentObjects(targetObjects,
311                                                    behavior,
312                                                    NOTICE,
313                                                    (objects->numrefs == 1 ? objects->refs : NULL));
314
315         /*
316          * Delete all the objects in the proper order.
317          */
318         for (i = 0; i < targetObjects->numrefs; i++)
319         {
320                 ObjectAddress *thisobj = targetObjects->refs + i;
321
322                 deleteOneObject(thisobj, depRel);
323         }
324
325         /* And clean up */
326         free_object_addresses(targetObjects);
327
328         heap_close(depRel, RowExclusiveLock);
329 }
330
331 /*
332  * deleteWhatDependsOn: attempt to drop everything that depends on the
333  * specified object, though not the object itself.      Behavior is always
334  * CASCADE.
335  *
336  * This is currently used only to clean out the contents of a schema
337  * (namespace): the passed object is a namespace.  We normally want this
338  * to be done silently, so there's an option to suppress NOTICE messages.
339  */
340 void
341 deleteWhatDependsOn(const ObjectAddress *object,
342                                         bool showNotices)
343 {
344         Relation        depRel;
345         ObjectAddresses *targetObjects;
346         int                     i;
347
348         /*
349          * We save some cycles by opening pg_depend just once and passing the
350          * Relation pointer down to all the recursive deletion steps.
351          */
352         depRel = heap_open(DependRelationId, RowExclusiveLock);
353
354         /*
355          * Acquire deletion lock on the target object.  (Ideally the caller has
356          * done this already, but many places are sloppy about it.)
357          */
358         AcquireDeletionLock(object);
359
360         /*
361          * Construct a list of objects to delete (ie, the given object plus
362          * everything directly or indirectly dependent on it).
363          */
364         targetObjects = new_object_addresses();
365
366         findDependentObjects(object,
367                                                  DEPFLAG_ORIGINAL,
368                                                  NULL,  /* empty stack */
369                                                  targetObjects,
370                                                  NULL,  /* no pendingObjects */
371                                                  depRel);
372
373         /*
374          * Check if deletion is allowed, and report about cascaded deletes.
375          */
376         reportDependentObjects(targetObjects,
377                                                    DROP_CASCADE,
378                                                    showNotices ? NOTICE : DEBUG2,
379                                                    object);
380
381         /*
382          * Delete all the objects in the proper order, except we skip the original
383          * object.
384          */
385         for (i = 0; i < targetObjects->numrefs; i++)
386         {
387                 ObjectAddress *thisobj = targetObjects->refs + i;
388                 ObjectAddressExtra *thisextra = targetObjects->extras + i;
389
390                 if (thisextra->flags & DEPFLAG_ORIGINAL)
391                         continue;
392
393                 deleteOneObject(thisobj, depRel);
394         }
395
396         /* And clean up */
397         free_object_addresses(targetObjects);
398
399         heap_close(depRel, RowExclusiveLock);
400 }
401
402 /*
403  * findDependentObjects - find all objects that depend on 'object'
404  *
405  * For every object that depends on the starting object, acquire a deletion
406  * lock on the object, add it to targetObjects (if not already there),
407  * and recursively find objects that depend on it.  An object's dependencies
408  * will be placed into targetObjects before the object itself; this means
409  * that the finished list's order represents a safe deletion order.
410  *
411  * The caller must already have a deletion lock on 'object' itself,
412  * but must not have added it to targetObjects.  (Note: there are corner
413  * cases where we won't add the object either, and will also release the
414  * caller-taken lock.  This is a bit ugly, but the API is set up this way
415  * to allow easy rechecking of an object's liveness after we lock it.  See
416  * notes within the function.)
417  *
418  * When dropping a whole object (subId = 0), we find dependencies for
419  * its sub-objects too.
420  *
421  *      object: the object to add to targetObjects and find dependencies on
422  *      flags: flags to be ORed into the object's targetObjects entry
423  *      stack: list of objects being visited in current recursion; topmost item
424  *                      is the object that we recursed from (NULL for external callers)
425  *      targetObjects: list of objects that are scheduled to be deleted
426  *      pendingObjects: list of other objects slated for destruction, but
427  *                      not necessarily in targetObjects yet (can be NULL if none)
428  *      depRel: already opened pg_depend relation
429  */
430 static void
431 findDependentObjects(const ObjectAddress *object,
432                                          int flags,
433                                          ObjectAddressStack *stack,
434                                          ObjectAddresses *targetObjects,
435                                          const ObjectAddresses *pendingObjects,
436                                          Relation depRel)
437 {
438         ScanKeyData key[3];
439         int                     nkeys;
440         SysScanDesc scan;
441         HeapTuple       tup;
442         ObjectAddress otherObject;
443         ObjectAddressStack mystack;
444         ObjectAddressExtra extra;
445         ObjectAddressStack *stackptr;
446
447         /*
448          * If the target object is already being visited in an outer recursion
449          * level, just report the current flags back to that level and exit.
450          * This is needed to avoid infinite recursion in the face of circular
451          * dependencies.
452          *
453          * The stack check alone would result in dependency loops being broken at
454          * an arbitrary point, ie, the first member object of the loop to be
455          * visited is the last one to be deleted.  This is obviously unworkable.
456          * However, the check for internal dependency below guarantees that we
457          * will not break a loop at an internal dependency: if we enter the loop
458          * at an "owned" object we will switch and start at the "owning" object
459          * instead.  We could probably hack something up to avoid breaking at an
460          * auto dependency, too, if we had to.  However there are no known cases
461          * where that would be necessary.
462          */
463         for (stackptr = stack; stackptr; stackptr = stackptr->next)
464         {
465                 if (object->classId == stackptr->object->classId &&
466                         object->objectId == stackptr->object->objectId)
467                 {
468                         if (object->objectSubId == stackptr->object->objectSubId)
469                         {
470                                 stackptr->flags |= flags;
471                                 return;
472                         }
473                         /*
474                          * Could visit column with whole table already on stack; this is
475                          * the same case noted in object_address_present_add_flags().
476                          * (It's not clear this can really happen, but we might as well
477                          * check.)
478                          */
479                         if (stackptr->object->objectSubId == 0)
480                                 return;
481                 }
482         }
483
484         /*
485          * It's also possible that the target object has already been completely
486          * processed and put into targetObjects.  If so, again we just add the
487          * specified flags to its entry and return.
488          *
489          * (Note: in these early-exit cases we could release the caller-taken
490          * lock, since the object is presumably now locked multiple times;
491          * but it seems not worth the cycles.)
492          */
493         if (object_address_present_add_flags(object, flags, targetObjects))
494                 return;
495
496         /*
497          * The target object might be internally dependent on some other object
498          * (its "owner").  If so, and if we aren't recursing from the owning
499          * object, we have to transform this deletion request into a deletion
500          * request of the owning object.  (We'll eventually recurse back to this
501          * object, but the owning object has to be visited first so it will be
502          * deleted after.)  The way to find out about this is to scan the
503          * pg_depend entries that show what this object depends on.
504          */
505         ScanKeyInit(&key[0],
506                                 Anum_pg_depend_classid,
507                                 BTEqualStrategyNumber, F_OIDEQ,
508                                 ObjectIdGetDatum(object->classId));
509         ScanKeyInit(&key[1],
510                                 Anum_pg_depend_objid,
511                                 BTEqualStrategyNumber, F_OIDEQ,
512                                 ObjectIdGetDatum(object->objectId));
513         if (object->objectSubId != 0)
514         {
515                 ScanKeyInit(&key[2],
516                                         Anum_pg_depend_objsubid,
517                                         BTEqualStrategyNumber, F_INT4EQ,
518                                         Int32GetDatum(object->objectSubId));
519                 nkeys = 3;
520         }
521         else
522                 nkeys = 2;
523
524         scan = systable_beginscan(depRel, DependDependerIndexId, true,
525                                                           SnapshotNow, nkeys, key);
526
527         while (HeapTupleIsValid(tup = systable_getnext(scan)))
528         {
529                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
530
531                 otherObject.classId = foundDep->refclassid;
532                 otherObject.objectId = foundDep->refobjid;
533                 otherObject.objectSubId = foundDep->refobjsubid;
534
535                 switch (foundDep->deptype)
536                 {
537                         case DEPENDENCY_NORMAL:
538                         case DEPENDENCY_AUTO:
539                                 /* no problem */
540                                 break;
541                         case DEPENDENCY_INTERNAL:
542
543                                 /*
544                                  * This object is part of the internal implementation of
545                                  * another object.      We have three cases:
546                                  *
547                                  * 1. At the outermost recursion level, disallow the DROP. (We
548                                  * just ereport here, rather than proceeding, since no other
549                                  * dependencies are likely to be interesting.)  However, if
550                                  * the other object is listed in pendingObjects, just release
551                                  * the caller's lock and return; we'll eventually complete
552                                  * the DROP when we reach that entry in the pending list.
553                                  */
554                                 if (stack == NULL)
555                                 {
556                                         char       *otherObjDesc;
557
558                                         if (object_address_present(&otherObject, pendingObjects))
559                                         {
560                                                 systable_endscan(scan);
561                                                 /* need to release caller's lock; see notes below */
562                                                 ReleaseDeletionLock(object);
563                                                 return;
564                                         }
565                                         otherObjDesc = getObjectDescription(&otherObject);
566                                         ereport(ERROR,
567                                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
568                                                          errmsg("cannot drop %s because %s requires it",
569                                                                         getObjectDescription(object),
570                                                                         otherObjDesc),
571                                                          errhint("You can drop %s instead.",
572                                                                          otherObjDesc)));
573                                 }
574
575                                 /*
576                                  * 2. When recursing from the other end of this dependency,
577                                  * it's okay to continue with the deletion. This holds when
578                                  * recursing from a whole object that includes the nominal
579                                  * other end as a component, too.
580                                  */
581                                 if (stack->object->classId == otherObject.classId &&
582                                         stack->object->objectId == otherObject.objectId &&
583                                         (stack->object->objectSubId == otherObject.objectSubId ||
584                                          stack->object->objectSubId == 0))
585                                         break;
586
587                                 /*
588                                  * 3. When recursing from anyplace else, transform this
589                                  * deletion request into a delete of the other object.
590                                  *
591                                  * First, release caller's lock on this object and get
592                                  * deletion lock on the other object.  (We must release
593                                  * caller's lock to avoid deadlock against a concurrent
594                                  * deletion of the other object.)
595                                  */
596                                 ReleaseDeletionLock(object);
597                                 AcquireDeletionLock(&otherObject);
598
599                                 /*
600                                  * The other object might have been deleted while we waited
601                                  * to lock it; if so, neither it nor the current object are
602                                  * interesting anymore.  We test this by checking the
603                                  * pg_depend entry (see notes below).
604                                  */
605                                 if (!systable_recheck_tuple(scan, tup))
606                                 {
607                                         systable_endscan(scan);
608                                         ReleaseDeletionLock(&otherObject);
609                                         return;
610                                 }
611
612                                 /*
613                                  * Okay, recurse to the other object instead of proceeding.
614                                  * We treat this exactly as if the original reference had
615                                  * linked to that object instead of this one; hence, pass
616                                  * through the same flags and stack.
617                                  */
618                                 findDependentObjects(&otherObject,
619                                                                          flags,
620                                                                          stack,
621                                                                          targetObjects,
622                                                                          pendingObjects,
623                                                                          depRel);
624                                 /* And we're done here. */
625                                 systable_endscan(scan);
626                                 return;
627                         case DEPENDENCY_PIN:
628
629                                 /*
630                                  * Should not happen; PIN dependencies should have zeroes in
631                                  * the depender fields...
632                                  */
633                                 elog(ERROR, "incorrect use of PIN dependency with %s",
634                                          getObjectDescription(object));
635                                 break;
636                         default:
637                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
638                                          foundDep->deptype, getObjectDescription(object));
639                                 break;
640                 }
641         }
642
643         systable_endscan(scan);
644
645         /*
646          * Now recurse to any dependent objects.  We must visit them first
647          * since they have to be deleted before the current object.
648          */
649         mystack.object = object;        /* set up a new stack level */
650         mystack.flags = flags;
651         mystack.next = stack;
652
653         ScanKeyInit(&key[0],
654                                 Anum_pg_depend_refclassid,
655                                 BTEqualStrategyNumber, F_OIDEQ,
656                                 ObjectIdGetDatum(object->classId));
657         ScanKeyInit(&key[1],
658                                 Anum_pg_depend_refobjid,
659                                 BTEqualStrategyNumber, F_OIDEQ,
660                                 ObjectIdGetDatum(object->objectId));
661         if (object->objectSubId != 0)
662         {
663                 ScanKeyInit(&key[2],
664                                         Anum_pg_depend_refobjsubid,
665                                         BTEqualStrategyNumber, F_INT4EQ,
666                                         Int32GetDatum(object->objectSubId));
667                 nkeys = 3;
668         }
669         else
670                 nkeys = 2;
671
672         scan = systable_beginscan(depRel, DependReferenceIndexId, true,
673                                                           SnapshotNow, nkeys, key);
674
675         while (HeapTupleIsValid(tup = systable_getnext(scan)))
676         {
677                 Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup);
678                 int             subflags;
679
680                 otherObject.classId = foundDep->classid;
681                 otherObject.objectId = foundDep->objid;
682                 otherObject.objectSubId = foundDep->objsubid;
683
684                 /*
685                  * Must lock the dependent object before recursing to it.
686                  */
687                 AcquireDeletionLock(&otherObject);
688
689                 /*
690                  * The dependent object might have been deleted while we waited
691                  * to lock it; if so, we don't need to do anything more with it.
692                  * We can test this cheaply and independently of the object's type
693                  * by seeing if the pg_depend tuple we are looking at is still live.
694                  * (If the object got deleted, the tuple would have been deleted too.)
695                  */
696                 if (!systable_recheck_tuple(scan, tup))
697                 {
698                         /* release the now-useless lock */
699                         ReleaseDeletionLock(&otherObject);
700                         /* and continue scanning for dependencies */
701                         continue;
702                 }
703
704                 /* Recurse, passing flags indicating the dependency type */
705                 switch (foundDep->deptype)
706                 {
707                         case DEPENDENCY_NORMAL:
708                                 subflags = DEPFLAG_NORMAL;
709                                 break;
710                         case DEPENDENCY_AUTO:
711                                 subflags = DEPFLAG_AUTO;
712                                 break;
713                         case DEPENDENCY_INTERNAL:
714                                 subflags = DEPFLAG_INTERNAL;
715                                 break;
716                         case DEPENDENCY_PIN:
717
718                                 /*
719                                  * For a PIN dependency we just ereport immediately; there
720                                  * won't be any others to report.
721                                  */
722                                 ereport(ERROR,
723                                                 (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
724                                                  errmsg("cannot drop %s because it is required by the database system",
725                                                                 getObjectDescription(object))));
726                                 subflags = 0;   /* keep compiler quiet */
727                                 break;
728                         default:
729                                 elog(ERROR, "unrecognized dependency type '%c' for %s",
730                                          foundDep->deptype, getObjectDescription(object));
731                                 subflags = 0;   /* keep compiler quiet */
732                                 break;
733                 }
734
735                 findDependentObjects(&otherObject,
736                                                          subflags,
737                                                          &mystack,
738                                                          targetObjects,
739                                                          pendingObjects,
740                                                          depRel);
741         }
742
743         systable_endscan(scan);
744
745         /*
746          * Finally, we can add the target object to targetObjects.  Be careful
747          * to include any flags that were passed back down to us from inner
748          * recursion levels.
749          */
750         extra.flags = mystack.flags;
751         if (stack)
752                 extra.dependee = *stack->object;
753         else
754                 memset(&extra.dependee, 0, sizeof(extra.dependee));
755         add_exact_object_address_extra(object, &extra, targetObjects);
756 }
757
758 /*
759  * reportDependentObjects - report about dependencies, and fail if RESTRICT
760  *
761  * Tell the user about dependent objects that we are going to delete
762  * (or would need to delete, but are prevented by RESTRICT mode);
763  * then error out if there are any and it's not CASCADE mode.
764  *
765  *      targetObjects: list of objects that are scheduled to be deleted
766  *      behavior: RESTRICT or CASCADE
767  *      msglevel: elog level for non-error report messages
768  *      origObject: base object of deletion, or NULL if not available
769  *              (the latter case occurs in DROP OWNED)
770  */
771 static void
772 reportDependentObjects(const ObjectAddresses *targetObjects,
773                                            DropBehavior behavior,
774                                            int msglevel,
775                                            const ObjectAddress *origObject)
776 {
777         bool            ok = true;
778         StringInfoData clientdetail;
779         StringInfoData logdetail;
780         int                     numReportedClient = 0;
781         int                     numNotReportedClient = 0;
782         int                     i;
783
784         /*
785          * If no error is to be thrown, and the msglevel is too low to be shown
786          * to either client or server log, there's no need to do any of the work.
787          *
788          * Note: this code doesn't know all there is to be known about elog
789          * levels, but it works for NOTICE and DEBUG2, which are the only values
790          * msglevel can currently have.  We also assume we are running in a normal
791          * operating environment.
792          */
793         if (behavior == DROP_CASCADE &&
794                 msglevel < client_min_messages &&
795                 (msglevel < log_min_messages || log_min_messages == LOG))
796                 return;
797
798         /*
799          * We limit the number of dependencies reported to the client to
800          * MAX_REPORTED_DEPS, since client software may not deal well with
801          * enormous error strings.      The server log always gets a full report.
802          */
803 #define MAX_REPORTED_DEPS 100
804
805         initStringInfo(&clientdetail);
806         initStringInfo(&logdetail);
807
808         /*
809          * We process the list back to front (ie, in dependency order not deletion
810          * order), since this makes for a more understandable display.
811          */
812         for (i = targetObjects->numrefs - 1; i >= 0; i--)
813         {
814                 const ObjectAddress *obj = &targetObjects->refs[i];
815                 const ObjectAddressExtra *extra = &targetObjects->extras[i];
816                 char       *objDesc;
817
818                 /* Ignore the original deletion target(s) */
819                 if (extra->flags & DEPFLAG_ORIGINAL)
820                         continue;
821
822                 objDesc = getObjectDescription(obj);
823
824                 /*
825                  * If, at any stage of the recursive search, we reached the object
826                  * via an AUTO or INTERNAL dependency, then it's okay to delete it
827                  * even in RESTRICT mode.
828                  */
829                 if (extra->flags & (DEPFLAG_AUTO | DEPFLAG_INTERNAL))
830                 {
831                         /*
832                          * auto-cascades are reported at DEBUG2, not msglevel.  We
833                          * don't try to combine them with the regular message because
834                          * the results are too confusing when client_min_messages and
835                          * log_min_messages are different.
836                          */
837                         ereport(DEBUG2,
838                                         (errmsg("drop auto-cascades to %s",
839                                                         objDesc)));
840                 }
841                 else if (behavior == DROP_RESTRICT)
842                 {
843                         char   *otherDesc = getObjectDescription(&extra->dependee);
844
845                         if (numReportedClient < MAX_REPORTED_DEPS)
846                         {
847                                 /* separate entries with a newline */
848                                 if (clientdetail.len != 0)
849                                         appendStringInfoChar(&clientdetail, '\n');
850                                 appendStringInfo(&clientdetail, _("%s depends on %s"),
851                                                                  objDesc, otherDesc);
852                                 numReportedClient++;
853                         }
854                         else
855                                 numNotReportedClient++;
856                         /* separate entries with a newline */
857                         if (logdetail.len != 0)
858                                 appendStringInfoChar(&logdetail, '\n');
859                         appendStringInfo(&logdetail, _("%s depends on %s"),
860                                                          objDesc, otherDesc);
861                         pfree(otherDesc);
862                         ok = false;
863                 }
864                 else
865                 {
866                         if (numReportedClient < MAX_REPORTED_DEPS)
867                         {
868                                 /* separate entries with a newline */
869                                 if (clientdetail.len != 0)
870                                         appendStringInfoChar(&clientdetail, '\n');
871                                 appendStringInfo(&clientdetail, _("drop cascades to %s"),
872                                                                  objDesc);
873                                 numReportedClient++;
874                         }
875                         else
876                                 numNotReportedClient++;
877                         /* separate entries with a newline */
878                         if (logdetail.len != 0)
879                                 appendStringInfoChar(&logdetail, '\n');
880                         appendStringInfo(&logdetail, _("drop cascades to %s"),
881                                                          objDesc);
882                 }
883
884                 pfree(objDesc);
885         }
886
887         if (numNotReportedClient > 0)
888                 appendStringInfo(&clientdetail, _("\nand %d other objects "
889                                                                                   "(see server log for list)"),
890                                                  numNotReportedClient);
891
892         if (!ok)
893         {
894                 if (origObject)
895                         ereport(ERROR,
896                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
897                                          errmsg("cannot drop %s because other objects depend on it",
898                                                         getObjectDescription(origObject)),
899                                          errdetail("%s", clientdetail.data),
900                                          errdetail_log("%s", logdetail.data),
901                                          errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
902                 else
903                         ereport(ERROR,
904                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
905                                          errmsg("cannot drop desired object(s) because other objects depend on them"),
906                                          errdetail("%s", clientdetail.data),
907                                          errdetail_log("%s", logdetail.data),
908                                          errhint("Use DROP ... CASCADE to drop the dependent objects too.")));
909         }
910         else if (numReportedClient > 1)
911         {
912                 ereport(msglevel,
913                                 /* translator: %d always has a value larger than 1 */
914                                 (errmsg("drop cascades to %d other objects",
915                                                 numReportedClient + numNotReportedClient),
916                                  errdetail("%s", clientdetail.data),
917                                  errdetail_log("%s", logdetail.data)));
918         }
919         else if (numReportedClient == 1)
920         {
921                 /* we just use the single item as-is */
922                 ereport(msglevel,
923                                 (errmsg_internal("%s", clientdetail.data)));
924         }
925
926         pfree(clientdetail.data);
927         pfree(logdetail.data);
928 }
929
930 /*
931  * deleteOneObject: delete a single object for performDeletion.
932  *
933  * depRel is the already-open pg_depend relation.
934  */
935 static void
936 deleteOneObject(const ObjectAddress *object, Relation depRel)
937 {
938         ScanKeyData key[3];
939         int                     nkeys;
940         SysScanDesc scan;
941         HeapTuple       tup;
942
943         /*
944          * First remove any pg_depend records that link from this object to
945          * others.  (Any records linking to this object should be gone already.)
946          *
947          * When dropping a whole object (subId = 0), remove all pg_depend records
948          * for its sub-objects too.
949          */
950         ScanKeyInit(&key[0],
951                                 Anum_pg_depend_classid,
952                                 BTEqualStrategyNumber, F_OIDEQ,
953                                 ObjectIdGetDatum(object->classId));
954         ScanKeyInit(&key[1],
955                                 Anum_pg_depend_objid,
956                                 BTEqualStrategyNumber, F_OIDEQ,
957                                 ObjectIdGetDatum(object->objectId));
958         if (object->objectSubId != 0)
959         {
960                 ScanKeyInit(&key[2],
961                                         Anum_pg_depend_objsubid,
962                                         BTEqualStrategyNumber, F_INT4EQ,
963                                         Int32GetDatum(object->objectSubId));
964                 nkeys = 3;
965         }
966         else
967                 nkeys = 2;
968
969         scan = systable_beginscan(depRel, DependDependerIndexId, true,
970                                                           SnapshotNow, nkeys, key);
971
972         while (HeapTupleIsValid(tup = systable_getnext(scan)))
973         {
974                 simple_heap_delete(depRel, &tup->t_self);
975         }
976
977         systable_endscan(scan);
978
979         /*
980          * Delete shared dependency references related to this object.  Again,
981          * if subId = 0, remove records for sub-objects too.
982          */
983         deleteSharedDependencyRecordsFor(object->classId, object->objectId,
984                                                                          object->objectSubId);
985
986         /*
987          * Now delete the object itself, in an object-type-dependent way.
988          */
989         doDeletion(object);
990
991         /*
992          * Delete any comments associated with this object.  (This is a convenient
993          * place to do it instead of having every object type know to do it.)
994          */
995         DeleteComments(object->objectId, object->classId, object->objectSubId);
996
997         /*
998          * CommandCounterIncrement here to ensure that preceding changes are all
999          * visible to the next deletion step.
1000          */
1001         CommandCounterIncrement();
1002
1003         /*
1004          * And we're done!
1005          */
1006 }
1007
1008 /*
1009  * doDeletion: actually delete a single object
1010  */
1011 static void
1012 doDeletion(const ObjectAddress *object)
1013 {
1014         switch (getObjectClass(object))
1015         {
1016                 case OCLASS_CLASS:
1017                         {
1018                                 char            relKind = get_rel_relkind(object->objectId);
1019
1020                                 if (relKind == RELKIND_INDEX)
1021                                 {
1022                                         Assert(object->objectSubId == 0);
1023                                         index_drop(object->objectId);
1024                                 }
1025                                 else
1026                                 {
1027                                         if (object->objectSubId != 0)
1028                                                 RemoveAttributeById(object->objectId,
1029                                                                                         object->objectSubId);
1030                                         else
1031                                                 heap_drop_with_catalog(object->objectId);
1032                                 }
1033                                 break;
1034                         }
1035
1036                 case OCLASS_PROC:
1037                         RemoveFunctionById(object->objectId);
1038                         break;
1039
1040                 case OCLASS_TYPE:
1041                         RemoveTypeById(object->objectId);
1042                         break;
1043
1044                 case OCLASS_CAST:
1045                         DropCastById(object->objectId);
1046                         break;
1047
1048                 case OCLASS_CONSTRAINT:
1049                         RemoveConstraintById(object->objectId);
1050                         break;
1051
1052                 case OCLASS_CONVERSION:
1053                         RemoveConversionById(object->objectId);
1054                         break;
1055
1056                 case OCLASS_DEFAULT:
1057                         RemoveAttrDefaultById(object->objectId);
1058                         break;
1059
1060                 case OCLASS_LANGUAGE:
1061                         DropProceduralLanguageById(object->objectId);
1062                         break;
1063
1064                 case OCLASS_OPERATOR:
1065                         RemoveOperatorById(object->objectId);
1066                         break;
1067
1068                 case OCLASS_OPCLASS:
1069                         RemoveOpClassById(object->objectId);
1070                         break;
1071
1072                 case OCLASS_OPFAMILY:
1073                         RemoveOpFamilyById(object->objectId);
1074                         break;
1075
1076                 case OCLASS_AMOP:
1077                         RemoveAmOpEntryById(object->objectId);
1078                         break;
1079
1080                 case OCLASS_AMPROC:
1081                         RemoveAmProcEntryById(object->objectId);
1082                         break;
1083
1084                 case OCLASS_REWRITE:
1085                         RemoveRewriteRuleById(object->objectId);
1086                         break;
1087
1088                 case OCLASS_TRIGGER:
1089                         RemoveTriggerById(object->objectId);
1090                         break;
1091
1092                 case OCLASS_SCHEMA:
1093                         RemoveSchemaById(object->objectId);
1094                         break;
1095
1096                 case OCLASS_TSPARSER:
1097                         RemoveTSParserById(object->objectId);
1098                         break;
1099
1100                 case OCLASS_TSDICT:
1101                         RemoveTSDictionaryById(object->objectId);
1102                         break;
1103
1104                 case OCLASS_TSTEMPLATE:
1105                         RemoveTSTemplateById(object->objectId);
1106                         break;
1107
1108                 case OCLASS_TSCONFIG:
1109                         RemoveTSConfigurationById(object->objectId);
1110                         break;
1111
1112                 case OCLASS_USER_MAPPING:
1113                         RemoveUserMappingById(object->objectId);
1114                         break;
1115
1116                 case OCLASS_FOREIGN_SERVER:
1117                         RemoveForeignServerById(object->objectId);
1118                         break;
1119
1120                 case OCLASS_FDW:
1121                         RemoveForeignDataWrapperById(object->objectId);
1122                         break;
1123
1124                         /* OCLASS_ROLE, OCLASS_DATABASE, OCLASS_TBLSPACE not handled */
1125
1126                 default:
1127                         elog(ERROR, "unrecognized object class: %u",
1128                                  object->classId);
1129         }
1130 }
1131
1132 /*
1133  * AcquireDeletionLock - acquire a suitable lock for deleting an object
1134  *
1135  * We use LockRelation for relations, LockDatabaseObject for everything
1136  * else.  Note that dependency.c is not concerned with deleting any kind of
1137  * shared-across-databases object, so we have no need for LockSharedObject.
1138  */
1139 static void
1140 AcquireDeletionLock(const ObjectAddress *object)
1141 {
1142         if (object->classId == RelationRelationId)
1143                 LockRelationOid(object->objectId, AccessExclusiveLock);
1144         else
1145                 /* assume we should lock the whole object not a sub-object */
1146                 LockDatabaseObject(object->classId, object->objectId, 0,
1147                                                    AccessExclusiveLock);
1148 }
1149
1150 /*
1151  * ReleaseDeletionLock - release an object deletion lock
1152  */
1153 static void
1154 ReleaseDeletionLock(const ObjectAddress *object)
1155 {
1156         if (object->classId == RelationRelationId)
1157                 UnlockRelationOid(object->objectId, AccessExclusiveLock);
1158         else
1159                 /* assume we should lock the whole object not a sub-object */
1160                 UnlockDatabaseObject(object->classId, object->objectId, 0,
1161                                                          AccessExclusiveLock);
1162 }
1163
1164 /*
1165  * recordDependencyOnExpr - find expression dependencies
1166  *
1167  * This is used to find the dependencies of rules, constraint expressions,
1168  * etc.
1169  *
1170  * Given an expression or query in node-tree form, find all the objects
1171  * it refers to (tables, columns, operators, functions, etc).  Record
1172  * a dependency of the specified type from the given depender object
1173  * to each object mentioned in the expression.
1174  *
1175  * rtable is the rangetable to be used to interpret Vars with varlevelsup=0.
1176  * It can be NIL if no such variables are expected.
1177  */
1178 void
1179 recordDependencyOnExpr(const ObjectAddress *depender,
1180                                            Node *expr, List *rtable,
1181                                            DependencyType behavior)
1182 {
1183         find_expr_references_context context;
1184
1185         context.addrs = new_object_addresses();
1186
1187         /* Set up interpretation for Vars at varlevelsup = 0 */
1188         context.rtables = list_make1(rtable);
1189
1190         /* Scan the expression tree for referenceable objects */
1191         find_expr_references_walker(expr, &context);
1192
1193         /* Remove any duplicates */
1194         eliminate_duplicate_dependencies(context.addrs);
1195
1196         /* And record 'em */
1197         recordMultipleDependencies(depender,
1198                                                            context.addrs->refs, context.addrs->numrefs,
1199                                                            behavior);
1200
1201         free_object_addresses(context.addrs);
1202 }
1203
1204 /*
1205  * recordDependencyOnSingleRelExpr - find expression dependencies
1206  *
1207  * As above, but only one relation is expected to be referenced (with
1208  * varno = 1 and varlevelsup = 0).      Pass the relation OID instead of a
1209  * range table.  An additional frammish is that dependencies on that
1210  * relation (or its component columns) will be marked with 'self_behavior',
1211  * whereas 'behavior' is used for everything else.
1212  */
1213 void
1214 recordDependencyOnSingleRelExpr(const ObjectAddress *depender,
1215                                                                 Node *expr, Oid relId,
1216                                                                 DependencyType behavior,
1217                                                                 DependencyType self_behavior)
1218 {
1219         find_expr_references_context context;
1220         RangeTblEntry rte;
1221
1222         context.addrs = new_object_addresses();
1223
1224         /* We gin up a rather bogus rangetable list to handle Vars */
1225         MemSet(&rte, 0, sizeof(rte));
1226         rte.type = T_RangeTblEntry;
1227         rte.rtekind = RTE_RELATION;
1228         rte.relid = relId;
1229
1230         context.rtables = list_make1(list_make1(&rte));
1231
1232         /* Scan the expression tree for referenceable objects */
1233         find_expr_references_walker(expr, &context);
1234
1235         /* Remove any duplicates */
1236         eliminate_duplicate_dependencies(context.addrs);
1237
1238         /* Separate self-dependencies if necessary */
1239         if (behavior != self_behavior && context.addrs->numrefs > 0)
1240         {
1241                 ObjectAddresses *self_addrs;
1242                 ObjectAddress *outobj;
1243                 int                     oldref,
1244                                         outrefs;
1245
1246                 self_addrs = new_object_addresses();
1247
1248                 outobj = context.addrs->refs;
1249                 outrefs = 0;
1250                 for (oldref = 0; oldref < context.addrs->numrefs; oldref++)
1251                 {
1252                         ObjectAddress *thisobj = context.addrs->refs + oldref;
1253
1254                         if (thisobj->classId == RelationRelationId &&
1255                                 thisobj->objectId == relId)
1256                         {
1257                                 /* Move this ref into self_addrs */
1258                                 add_exact_object_address(thisobj, self_addrs);
1259                         }
1260                         else
1261                         {
1262                                 /* Keep it in context.addrs */
1263                                 *outobj = *thisobj;
1264                                 outobj++;
1265                                 outrefs++;
1266                         }
1267                 }
1268                 context.addrs->numrefs = outrefs;
1269
1270                 /* Record the self-dependencies */
1271                 recordMultipleDependencies(depender,
1272                                                                    self_addrs->refs, self_addrs->numrefs,
1273                                                                    self_behavior);
1274
1275                 free_object_addresses(self_addrs);
1276         }
1277
1278         /* Record the external dependencies */
1279         recordMultipleDependencies(depender,
1280                                                            context.addrs->refs, context.addrs->numrefs,
1281                                                            behavior);
1282
1283         free_object_addresses(context.addrs);
1284 }
1285
1286 /*
1287  * Recursively search an expression tree for object references.
1288  *
1289  * Note: we avoid creating references to columns of tables that participate
1290  * in an SQL JOIN construct, but are not actually used anywhere in the query.
1291  * To do so, we do not scan the joinaliasvars list of a join RTE while
1292  * scanning the query rangetable, but instead scan each individual entry
1293  * of the alias list when we find a reference to it.
1294  *
1295  * Note: in many cases we do not need to create dependencies on the datatypes
1296  * involved in an expression, because we'll have an indirect dependency via
1297  * some other object.  For instance Var nodes depend on a column which depends
1298  * on the datatype, and OpExpr nodes depend on the operator which depends on
1299  * the datatype.  However we do need a type dependency if there is no such
1300  * indirect dependency, as for example in Const and CoerceToDomain nodes.
1301  */
1302 static bool
1303 find_expr_references_walker(Node *node,
1304                                                         find_expr_references_context *context)
1305 {
1306         if (node == NULL)
1307                 return false;
1308         if (IsA(node, Var))
1309         {
1310                 Var                *var = (Var *) node;
1311                 List       *rtable;
1312                 RangeTblEntry *rte;
1313
1314                 /* Find matching rtable entry, or complain if not found */
1315                 if (var->varlevelsup >= list_length(context->rtables))
1316                         elog(ERROR, "invalid varlevelsup %d", var->varlevelsup);
1317                 rtable = (List *) list_nth(context->rtables, var->varlevelsup);
1318                 if (var->varno <= 0 || var->varno > list_length(rtable))
1319                         elog(ERROR, "invalid varno %d", var->varno);
1320                 rte = rt_fetch(var->varno, rtable);
1321
1322                 /*
1323                  * A whole-row Var references no specific columns, so adds no new
1324                  * dependency.
1325                  */
1326                 if (var->varattno == InvalidAttrNumber)
1327                         return false;
1328                 if (rte->rtekind == RTE_RELATION)
1329                 {
1330                         /* If it's a plain relation, reference this column */
1331                         add_object_address(OCLASS_CLASS, rte->relid, var->varattno,
1332                                                            context->addrs);
1333                 }
1334                 else if (rte->rtekind == RTE_JOIN)
1335                 {
1336                         /* Scan join output column to add references to join inputs */
1337                         List       *save_rtables;
1338
1339                         /* We must make the context appropriate for join's level */
1340                         save_rtables = context->rtables;
1341                         context->rtables = list_copy_tail(context->rtables,
1342                                                                                           var->varlevelsup);
1343                         if (var->varattno <= 0 ||
1344                                 var->varattno > list_length(rte->joinaliasvars))
1345                                 elog(ERROR, "invalid varattno %d", var->varattno);
1346                         find_expr_references_walker((Node *) list_nth(rte->joinaliasvars,
1347                                                                                                                   var->varattno - 1),
1348                                                                                 context);
1349                         list_free(context->rtables);
1350                         context->rtables = save_rtables;
1351                 }
1352                 return false;
1353         }
1354         else if (IsA(node, Const))
1355         {
1356                 Const      *con = (Const *) node;
1357                 Oid                     objoid;
1358
1359                 /* A constant must depend on the constant's datatype */
1360                 add_object_address(OCLASS_TYPE, con->consttype, 0,
1361                                                    context->addrs);
1362
1363                 /*
1364                  * If it's a regclass or similar literal referring to an existing
1365                  * object, add a reference to that object.      (Currently, only the
1366                  * regclass and regconfig cases have any likely use, but we may as
1367                  * well handle all the OID-alias datatypes consistently.)
1368                  */
1369                 if (!con->constisnull)
1370                 {
1371                         switch (con->consttype)
1372                         {
1373                                 case REGPROCOID:
1374                                 case REGPROCEDUREOID:
1375                                         objoid = DatumGetObjectId(con->constvalue);
1376                                         if (SearchSysCacheExists(PROCOID,
1377                                                                                          ObjectIdGetDatum(objoid),
1378                                                                                          0, 0, 0))
1379                                                 add_object_address(OCLASS_PROC, objoid, 0,
1380                                                                                    context->addrs);
1381                                         break;
1382                                 case REGOPEROID:
1383                                 case REGOPERATOROID:
1384                                         objoid = DatumGetObjectId(con->constvalue);
1385                                         if (SearchSysCacheExists(OPEROID,
1386                                                                                          ObjectIdGetDatum(objoid),
1387                                                                                          0, 0, 0))
1388                                                 add_object_address(OCLASS_OPERATOR, objoid, 0,
1389                                                                                    context->addrs);
1390                                         break;
1391                                 case REGCLASSOID:
1392                                         objoid = DatumGetObjectId(con->constvalue);
1393                                         if (SearchSysCacheExists(RELOID,
1394                                                                                          ObjectIdGetDatum(objoid),
1395                                                                                          0, 0, 0))
1396                                                 add_object_address(OCLASS_CLASS, objoid, 0,
1397                                                                                    context->addrs);
1398                                         break;
1399                                 case REGTYPEOID:
1400                                         objoid = DatumGetObjectId(con->constvalue);
1401                                         if (SearchSysCacheExists(TYPEOID,
1402                                                                                          ObjectIdGetDatum(objoid),
1403                                                                                          0, 0, 0))
1404                                                 add_object_address(OCLASS_TYPE, objoid, 0,
1405                                                                                    context->addrs);
1406                                         break;
1407                                 case REGCONFIGOID:
1408                                         objoid = DatumGetObjectId(con->constvalue);
1409                                         if (SearchSysCacheExists(TSCONFIGOID,
1410                                                                                          ObjectIdGetDatum(objoid),
1411                                                                                          0, 0, 0))
1412                                                 add_object_address(OCLASS_TSCONFIG, objoid, 0,
1413                                                                                    context->addrs);
1414                                         break;
1415                                 case REGDICTIONARYOID:
1416                                         objoid = DatumGetObjectId(con->constvalue);
1417                                         if (SearchSysCacheExists(TSDICTOID,
1418                                                                                          ObjectIdGetDatum(objoid),
1419                                                                                          0, 0, 0))
1420                                                 add_object_address(OCLASS_TSDICT, objoid, 0,
1421                                                                                    context->addrs);
1422                                         break;
1423                         }
1424                 }
1425                 return false;
1426         }
1427         else if (IsA(node, Param))
1428         {
1429                 Param      *param = (Param *) node;
1430
1431                 /* A parameter must depend on the parameter's datatype */
1432                 add_object_address(OCLASS_TYPE, param->paramtype, 0,
1433                                                    context->addrs);
1434         }
1435         else if (IsA(node, FuncExpr))
1436         {
1437                 FuncExpr   *funcexpr = (FuncExpr *) node;
1438
1439                 add_object_address(OCLASS_PROC, funcexpr->funcid, 0,
1440                                                    context->addrs);
1441                 /* fall through to examine arguments */
1442         }
1443         else if (IsA(node, OpExpr))
1444         {
1445                 OpExpr     *opexpr = (OpExpr *) node;
1446
1447                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1448                                                    context->addrs);
1449                 /* fall through to examine arguments */
1450         }
1451         else if (IsA(node, DistinctExpr))
1452         {
1453                 DistinctExpr *distinctexpr = (DistinctExpr *) node;
1454
1455                 add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0,
1456                                                    context->addrs);
1457                 /* fall through to examine arguments */
1458         }
1459         else if (IsA(node, ScalarArrayOpExpr))
1460         {
1461                 ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node;
1462
1463                 add_object_address(OCLASS_OPERATOR, opexpr->opno, 0,
1464                                                    context->addrs);
1465                 /* fall through to examine arguments */
1466         }
1467         else if (IsA(node, NullIfExpr))
1468         {
1469                 NullIfExpr *nullifexpr = (NullIfExpr *) node;
1470
1471                 add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0,
1472                                                    context->addrs);
1473                 /* fall through to examine arguments */
1474         }
1475         else if (IsA(node, Aggref))
1476         {
1477                 Aggref     *aggref = (Aggref *) node;
1478
1479                 add_object_address(OCLASS_PROC, aggref->aggfnoid, 0,
1480                                                    context->addrs);
1481                 /* fall through to examine arguments */
1482         }
1483         else if (IsA(node, WindowFunc))
1484         {
1485                 WindowFunc *wfunc = (WindowFunc *) node;
1486
1487                 add_object_address(OCLASS_PROC, wfunc->winfnoid, 0,
1488                                                    context->addrs);
1489                 /* fall through to examine arguments */
1490         }
1491         else if (IsA(node, SubPlan))
1492         {
1493                 /* Extra work needed here if we ever need this case */
1494                 elog(ERROR, "already-planned subqueries not supported");
1495         }
1496         else if (IsA(node, RelabelType))
1497         {
1498                 RelabelType *relab = (RelabelType *) node;
1499
1500                 /* since there is no function dependency, need to depend on type */
1501                 add_object_address(OCLASS_TYPE, relab->resulttype, 0,
1502                                                    context->addrs);
1503         }
1504         else if (IsA(node, CoerceViaIO))
1505         {
1506                 CoerceViaIO *iocoerce = (CoerceViaIO *) node;
1507
1508                 /* since there is no exposed function, need to depend on type */
1509                 add_object_address(OCLASS_TYPE, iocoerce->resulttype, 0,
1510                                                    context->addrs);
1511         }
1512         else if (IsA(node, ArrayCoerceExpr))
1513         {
1514                 ArrayCoerceExpr *acoerce = (ArrayCoerceExpr *) node;
1515
1516                 if (OidIsValid(acoerce->elemfuncid))
1517                         add_object_address(OCLASS_PROC, acoerce->elemfuncid, 0,
1518                                                            context->addrs);
1519                 add_object_address(OCLASS_TYPE, acoerce->resulttype, 0,
1520                                                    context->addrs);
1521                 /* fall through to examine arguments */
1522         }
1523         else if (IsA(node, ConvertRowtypeExpr))
1524         {
1525                 ConvertRowtypeExpr *cvt = (ConvertRowtypeExpr *) node;
1526
1527                 /* since there is no function dependency, need to depend on type */
1528                 add_object_address(OCLASS_TYPE, cvt->resulttype, 0,
1529                                                    context->addrs);
1530         }
1531         else if (IsA(node, RowExpr))
1532         {
1533                 RowExpr    *rowexpr = (RowExpr *) node;
1534
1535                 add_object_address(OCLASS_TYPE, rowexpr->row_typeid, 0,
1536                                                    context->addrs);
1537         }
1538         else if (IsA(node, RowCompareExpr))
1539         {
1540                 RowCompareExpr *rcexpr = (RowCompareExpr *) node;
1541                 ListCell   *l;
1542
1543                 foreach(l, rcexpr->opnos)
1544                 {
1545                         add_object_address(OCLASS_OPERATOR, lfirst_oid(l), 0,
1546                                                            context->addrs);
1547                 }
1548                 foreach(l, rcexpr->opfamilies)
1549                 {
1550                         add_object_address(OCLASS_OPFAMILY, lfirst_oid(l), 0,
1551                                                            context->addrs);
1552                 }
1553                 /* fall through to examine arguments */
1554         }
1555         else if (IsA(node, CoerceToDomain))
1556         {
1557                 CoerceToDomain *cd = (CoerceToDomain *) node;
1558
1559                 add_object_address(OCLASS_TYPE, cd->resulttype, 0,
1560                                                    context->addrs);
1561         }
1562         else if (IsA(node, SortGroupClause))
1563         {
1564                 SortGroupClause *sgc = (SortGroupClause *) node;
1565
1566                 add_object_address(OCLASS_OPERATOR, sgc->eqop, 0,
1567                                                    context->addrs);
1568                 if (OidIsValid(sgc->sortop))
1569                         add_object_address(OCLASS_OPERATOR, sgc->sortop, 0,
1570                                                            context->addrs);
1571                 return false;
1572         }
1573         else if (IsA(node, Query))
1574         {
1575                 /* Recurse into RTE subquery or not-yet-planned sublink subquery */
1576                 Query      *query = (Query *) node;
1577                 ListCell   *rtable;
1578                 bool            result;
1579
1580                 /*
1581                  * Add whole-relation refs for each plain relation mentioned in the
1582                  * subquery's rtable, as well as datatype refs for any datatypes used
1583                  * as a RECORD function's output.  (Note: query_tree_walker takes care
1584                  * of recursing into RTE_FUNCTION RTEs, subqueries, etc, so no need to
1585                  * do that here.  But keep it from looking at join alias lists.)
1586                  */
1587                 foreach(rtable, query->rtable)
1588                 {
1589                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
1590                         ListCell   *ct;
1591
1592                         switch (rte->rtekind)
1593                         {
1594                                 case RTE_RELATION:
1595                                         add_object_address(OCLASS_CLASS, rte->relid, 0,
1596                                                                            context->addrs);
1597                                         break;
1598                                 case RTE_FUNCTION:
1599                                         foreach(ct, rte->funccoltypes)
1600                                         {
1601                                                 add_object_address(OCLASS_TYPE, lfirst_oid(ct), 0,
1602                                                                                    context->addrs);
1603                                         }
1604                                         break;
1605                                 default:
1606                                         break;
1607                         }
1608                 }
1609
1610                 /* query_tree_walker ignores ORDER BY etc, but we need those opers */
1611                 find_expr_references_walker((Node *) query->sortClause, context);
1612                 find_expr_references_walker((Node *) query->groupClause, context);
1613                 find_expr_references_walker((Node *) query->windowClause, context);
1614                 find_expr_references_walker((Node *) query->distinctClause, context);
1615
1616                 /* Examine substructure of query */
1617                 context->rtables = lcons(query->rtable, context->rtables);
1618                 result = query_tree_walker(query,
1619                                                                    find_expr_references_walker,
1620                                                                    (void *) context,
1621                                                                    QTW_IGNORE_JOINALIASES);
1622                 context->rtables = list_delete_first(context->rtables);
1623                 return result;
1624         }
1625         else if (IsA(node, SetOperationStmt))
1626         {
1627                 SetOperationStmt *setop = (SetOperationStmt *) node;
1628
1629                 /* we need to look at the groupClauses for operator references */
1630                 find_expr_references_walker((Node *) setop->groupClauses, context);
1631                 /* fall through to examine child nodes */
1632         }
1633
1634         return expression_tree_walker(node, find_expr_references_walker,
1635                                                                   (void *) context);
1636 }
1637
1638 /*
1639  * Given an array of dependency references, eliminate any duplicates.
1640  */
1641 static void
1642 eliminate_duplicate_dependencies(ObjectAddresses *addrs)
1643 {
1644         ObjectAddress *priorobj;
1645         int                     oldref,
1646                                 newrefs;
1647
1648         /*
1649          * We can't sort if the array has "extra" data, because there's no way
1650          * to keep it in sync.  Fortunately that combination of features is
1651          * not needed.
1652          */
1653         Assert(!addrs->extras);
1654
1655         if (addrs->numrefs <= 1)
1656                 return;                                 /* nothing to do */
1657
1658         /* Sort the refs so that duplicates are adjacent */
1659         qsort((void *) addrs->refs, addrs->numrefs, sizeof(ObjectAddress),
1660                   object_address_comparator);
1661
1662         /* Remove dups */
1663         priorobj = addrs->refs;
1664         newrefs = 1;
1665         for (oldref = 1; oldref < addrs->numrefs; oldref++)
1666         {
1667                 ObjectAddress *thisobj = addrs->refs + oldref;
1668
1669                 if (priorobj->classId == thisobj->classId &&
1670                         priorobj->objectId == thisobj->objectId)
1671                 {
1672                         if (priorobj->objectSubId == thisobj->objectSubId)
1673                                 continue;               /* identical, so drop thisobj */
1674
1675                         /*
1676                          * If we have a whole-object reference and a reference to a part
1677                          * of the same object, we don't need the whole-object reference
1678                          * (for example, we don't need to reference both table foo and
1679                          * column foo.bar).  The whole-object reference will always appear
1680                          * first in the sorted list.
1681                          */
1682                         if (priorobj->objectSubId == 0)
1683                         {
1684                                 /* replace whole ref with partial */
1685                                 priorobj->objectSubId = thisobj->objectSubId;
1686                                 continue;
1687                         }
1688                 }
1689                 /* Not identical, so add thisobj to output set */
1690                 priorobj++;
1691                 *priorobj = *thisobj;
1692                 newrefs++;
1693         }
1694
1695         addrs->numrefs = newrefs;
1696 }
1697
1698 /*
1699  * qsort comparator for ObjectAddress items
1700  */
1701 static int
1702 object_address_comparator(const void *a, const void *b)
1703 {
1704         const ObjectAddress *obja = (const ObjectAddress *) a;
1705         const ObjectAddress *objb = (const ObjectAddress *) b;
1706
1707         if (obja->classId < objb->classId)
1708                 return -1;
1709         if (obja->classId > objb->classId)
1710                 return 1;
1711         if (obja->objectId < objb->objectId)
1712                 return -1;
1713         if (obja->objectId > objb->objectId)
1714                 return 1;
1715
1716         /*
1717          * We sort the subId as an unsigned int so that 0 will come first. See
1718          * logic in eliminate_duplicate_dependencies.
1719          */
1720         if ((unsigned int) obja->objectSubId < (unsigned int) objb->objectSubId)
1721                 return -1;
1722         if ((unsigned int) obja->objectSubId > (unsigned int) objb->objectSubId)
1723                 return 1;
1724         return 0;
1725 }
1726
1727 /*
1728  * Routines for handling an expansible array of ObjectAddress items.
1729  *
1730  * new_object_addresses: create a new ObjectAddresses array.
1731  */
1732 ObjectAddresses *
1733 new_object_addresses(void)
1734 {
1735         ObjectAddresses *addrs;
1736
1737         addrs = palloc(sizeof(ObjectAddresses));
1738
1739         addrs->numrefs = 0;
1740         addrs->maxrefs = 32;
1741         addrs->refs = (ObjectAddress *)
1742                 palloc(addrs->maxrefs * sizeof(ObjectAddress));
1743         addrs->extras = NULL;           /* until/unless needed */
1744
1745         return addrs;
1746 }
1747
1748 /*
1749  * Add an entry to an ObjectAddresses array.
1750  *
1751  * It is convenient to specify the class by ObjectClass rather than directly
1752  * by catalog OID.
1753  */
1754 static void
1755 add_object_address(ObjectClass oclass, Oid objectId, int32 subId,
1756                                    ObjectAddresses *addrs)
1757 {
1758         ObjectAddress *item;
1759
1760         /* enlarge array if needed */
1761         if (addrs->numrefs >= addrs->maxrefs)
1762         {
1763                 addrs->maxrefs *= 2;
1764                 addrs->refs = (ObjectAddress *)
1765                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1766                 Assert(!addrs->extras);
1767         }
1768         /* record this item */
1769         item = addrs->refs + addrs->numrefs;
1770         item->classId = object_classes[oclass];
1771         item->objectId = objectId;
1772         item->objectSubId = subId;
1773         addrs->numrefs++;
1774 }
1775
1776 /*
1777  * Add an entry to an ObjectAddresses array.
1778  *
1779  * As above, but specify entry exactly.
1780  */
1781 void
1782 add_exact_object_address(const ObjectAddress *object,
1783                                                  ObjectAddresses *addrs)
1784 {
1785         ObjectAddress *item;
1786
1787         /* enlarge array if needed */
1788         if (addrs->numrefs >= addrs->maxrefs)
1789         {
1790                 addrs->maxrefs *= 2;
1791                 addrs->refs = (ObjectAddress *)
1792                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1793                 Assert(!addrs->extras);
1794         }
1795         /* record this item */
1796         item = addrs->refs + addrs->numrefs;
1797         *item = *object;
1798         addrs->numrefs++;
1799 }
1800
1801 /*
1802  * Add an entry to an ObjectAddresses array.
1803  *
1804  * As above, but specify entry exactly and provide some "extra" data too.
1805  */
1806 static void
1807 add_exact_object_address_extra(const ObjectAddress *object,
1808                                                            const ObjectAddressExtra *extra,
1809                                                            ObjectAddresses *addrs)
1810 {
1811         ObjectAddress *item;
1812         ObjectAddressExtra *itemextra;
1813
1814         /* allocate extra space if first time */
1815         if (!addrs->extras)
1816                 addrs->extras = (ObjectAddressExtra *)
1817                         palloc(addrs->maxrefs * sizeof(ObjectAddressExtra));
1818
1819         /* enlarge array if needed */
1820         if (addrs->numrefs >= addrs->maxrefs)
1821         {
1822                 addrs->maxrefs *= 2;
1823                 addrs->refs = (ObjectAddress *)
1824                         repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress));
1825                 addrs->extras = (ObjectAddressExtra *)
1826                   repalloc(addrs->extras, addrs->maxrefs * sizeof(ObjectAddressExtra));
1827         }
1828         /* record this item */
1829         item = addrs->refs + addrs->numrefs;
1830         *item = *object;
1831         itemextra = addrs->extras + addrs->numrefs;
1832         *itemextra = *extra;
1833         addrs->numrefs++;
1834 }
1835
1836 /*
1837  * Test whether an object is present in an ObjectAddresses array.
1838  *
1839  * We return "true" if object is a subobject of something in the array, too.
1840  */
1841 bool
1842 object_address_present(const ObjectAddress *object,
1843                                            const ObjectAddresses *addrs)
1844 {
1845         int                     i;
1846
1847         for (i = addrs->numrefs - 1; i >= 0; i--)
1848         {
1849                 const ObjectAddress *thisobj = addrs->refs + i;
1850
1851                 if (object->classId == thisobj->classId &&
1852                         object->objectId == thisobj->objectId)
1853                 {
1854                         if (object->objectSubId == thisobj->objectSubId ||
1855                                 thisobj->objectSubId == 0)
1856                                 return true;
1857                 }
1858         }
1859
1860         return false;
1861 }
1862
1863 /*
1864  * As above, except that if the object is present then also OR the given
1865  * flags into its associated extra data (which must exist).
1866  */
1867 static bool
1868 object_address_present_add_flags(const ObjectAddress *object,
1869                                                                  int flags,
1870                                                                  ObjectAddresses *addrs)
1871 {
1872         int                     i;
1873
1874         for (i = addrs->numrefs - 1; i >= 0; i--)
1875         {
1876                 ObjectAddress *thisobj = addrs->refs + i;
1877
1878                 if (object->classId == thisobj->classId &&
1879                         object->objectId == thisobj->objectId)
1880                 {
1881                         if (object->objectSubId == thisobj->objectSubId)
1882                         {
1883                                 ObjectAddressExtra *thisextra = addrs->extras + i;
1884
1885                                 thisextra->flags |= flags;
1886                                 return true;
1887                         }
1888                         if (thisobj->objectSubId == 0)
1889                         {
1890                                 /*
1891                                  * We get here if we find a need to delete a column after
1892                                  * having already decided to drop its whole table.  Obviously
1893                                  * we no longer need to drop the column.  But don't plaster
1894                                  * its flags on the table.
1895                                  */
1896                                 return true;
1897                         }
1898                 }
1899         }
1900
1901         return false;
1902 }
1903
1904 /*
1905  * Record multiple dependencies from an ObjectAddresses array, after first
1906  * removing any duplicates.
1907  */
1908 void
1909 record_object_address_dependencies(const ObjectAddress *depender,
1910                                                                    ObjectAddresses *referenced,
1911                                                                    DependencyType behavior)
1912 {
1913         eliminate_duplicate_dependencies(referenced);
1914         recordMultipleDependencies(depender,
1915                                                            referenced->refs, referenced->numrefs,
1916                                                            behavior);
1917 }
1918
1919 /*
1920  * Clean up when done with an ObjectAddresses array.
1921  */
1922 void
1923 free_object_addresses(ObjectAddresses *addrs)
1924 {
1925         pfree(addrs->refs);
1926         if (addrs->extras)
1927                 pfree(addrs->extras);
1928         pfree(addrs);
1929 }
1930
1931 /*
1932  * Determine the class of a given object identified by objectAddress.
1933  *
1934  * This function is essentially the reverse mapping for the object_classes[]
1935  * table.  We implement it as a function because the OIDs aren't consecutive.
1936  */
1937 ObjectClass
1938 getObjectClass(const ObjectAddress *object)
1939 {
1940         switch (object->classId)
1941         {
1942                 case RelationRelationId:
1943                         /* caller must check objectSubId */
1944                         return OCLASS_CLASS;
1945
1946                 case ProcedureRelationId:
1947                         Assert(object->objectSubId == 0);
1948                         return OCLASS_PROC;
1949
1950                 case TypeRelationId:
1951                         Assert(object->objectSubId == 0);
1952                         return OCLASS_TYPE;
1953
1954                 case CastRelationId:
1955                         Assert(object->objectSubId == 0);
1956                         return OCLASS_CAST;
1957
1958                 case ConstraintRelationId:
1959                         Assert(object->objectSubId == 0);
1960                         return OCLASS_CONSTRAINT;
1961
1962                 case ConversionRelationId:
1963                         Assert(object->objectSubId == 0);
1964                         return OCLASS_CONVERSION;
1965
1966                 case AttrDefaultRelationId:
1967                         Assert(object->objectSubId == 0);
1968                         return OCLASS_DEFAULT;
1969
1970                 case LanguageRelationId:
1971                         Assert(object->objectSubId == 0);
1972                         return OCLASS_LANGUAGE;
1973
1974                 case OperatorRelationId:
1975                         Assert(object->objectSubId == 0);
1976                         return OCLASS_OPERATOR;
1977
1978                 case OperatorClassRelationId:
1979                         Assert(object->objectSubId == 0);
1980                         return OCLASS_OPCLASS;
1981
1982                 case OperatorFamilyRelationId:
1983                         Assert(object->objectSubId == 0);
1984                         return OCLASS_OPFAMILY;
1985
1986                 case AccessMethodOperatorRelationId:
1987                         Assert(object->objectSubId == 0);
1988                         return OCLASS_AMOP;
1989
1990                 case AccessMethodProcedureRelationId:
1991                         Assert(object->objectSubId == 0);
1992                         return OCLASS_AMPROC;
1993
1994                 case RewriteRelationId:
1995                         Assert(object->objectSubId == 0);
1996                         return OCLASS_REWRITE;
1997
1998                 case TriggerRelationId:
1999                         Assert(object->objectSubId == 0);
2000                         return OCLASS_TRIGGER;
2001
2002                 case NamespaceRelationId:
2003                         Assert(object->objectSubId == 0);
2004                         return OCLASS_SCHEMA;
2005
2006                 case TSParserRelationId:
2007                         Assert(object->objectSubId == 0);
2008                         return OCLASS_TSPARSER;
2009
2010                 case TSDictionaryRelationId:
2011                         Assert(object->objectSubId == 0);
2012                         return OCLASS_TSDICT;
2013
2014                 case TSTemplateRelationId:
2015                         Assert(object->objectSubId == 0);
2016                         return OCLASS_TSTEMPLATE;
2017
2018                 case TSConfigRelationId:
2019                         Assert(object->objectSubId == 0);
2020                         return OCLASS_TSCONFIG;
2021
2022                 case AuthIdRelationId:
2023                         Assert(object->objectSubId == 0);
2024                         return OCLASS_ROLE;
2025
2026                 case DatabaseRelationId:
2027                         Assert(object->objectSubId == 0);
2028                         return OCLASS_DATABASE;
2029
2030                 case TableSpaceRelationId:
2031                         Assert(object->objectSubId == 0);
2032                         return OCLASS_TBLSPACE;
2033
2034                 case ForeignDataWrapperRelationId:
2035                         Assert(object->objectSubId == 0);
2036                         return OCLASS_FDW;
2037
2038                 case ForeignServerRelationId:
2039                         Assert(object->objectSubId == 0);
2040                         return OCLASS_FOREIGN_SERVER;
2041
2042                 case UserMappingRelationId:
2043                         Assert(object->objectSubId == 0);
2044                         return OCLASS_USER_MAPPING;
2045         }
2046
2047         /* shouldn't get here */
2048         elog(ERROR, "unrecognized object class: %u", object->classId);
2049         return OCLASS_CLASS;            /* keep compiler quiet */
2050 }
2051
2052 /*
2053  * getObjectDescription: build an object description for messages
2054  *
2055  * The result is a palloc'd string.
2056  */
2057 char *
2058 getObjectDescription(const ObjectAddress *object)
2059 {
2060         StringInfoData buffer;
2061
2062         initStringInfo(&buffer);
2063
2064         switch (getObjectClass(object))
2065         {
2066                 case OCLASS_CLASS:
2067                         getRelationDescription(&buffer, object->objectId);
2068                         if (object->objectSubId != 0)
2069                                 appendStringInfo(&buffer, _(" column %s"),
2070                                                                  get_relid_attribute_name(object->objectId,
2071                                                                                                            object->objectSubId));
2072                         break;
2073
2074                 case OCLASS_PROC:
2075                         appendStringInfo(&buffer, _("function %s"),
2076                                                          format_procedure(object->objectId));
2077                         break;
2078
2079                 case OCLASS_TYPE:
2080                         appendStringInfo(&buffer, _("type %s"),
2081                                                          format_type_be(object->objectId));
2082                         break;
2083
2084                 case OCLASS_CAST:
2085                         {
2086                                 Relation        castDesc;
2087                                 ScanKeyData skey[1];
2088                                 SysScanDesc rcscan;
2089                                 HeapTuple       tup;
2090                                 Form_pg_cast castForm;
2091
2092                                 castDesc = heap_open(CastRelationId, AccessShareLock);
2093
2094                                 ScanKeyInit(&skey[0],
2095                                                         ObjectIdAttributeNumber,
2096                                                         BTEqualStrategyNumber, F_OIDEQ,
2097                                                         ObjectIdGetDatum(object->objectId));
2098
2099                                 rcscan = systable_beginscan(castDesc, CastOidIndexId, true,
2100                                                                                         SnapshotNow, 1, skey);
2101
2102                                 tup = systable_getnext(rcscan);
2103
2104                                 if (!HeapTupleIsValid(tup))
2105                                         elog(ERROR, "could not find tuple for cast %u",
2106                                                  object->objectId);
2107
2108                                 castForm = (Form_pg_cast) GETSTRUCT(tup);
2109
2110                                 appendStringInfo(&buffer, _("cast from %s to %s"),
2111                                                                  format_type_be(castForm->castsource),
2112                                                                  format_type_be(castForm->casttarget));
2113
2114                                 systable_endscan(rcscan);
2115                                 heap_close(castDesc, AccessShareLock);
2116                                 break;
2117                         }
2118
2119                 case OCLASS_CONSTRAINT:
2120                         {
2121                                 HeapTuple       conTup;
2122                                 Form_pg_constraint con;
2123
2124                                 conTup = SearchSysCache(CONSTROID,
2125                                                                                 ObjectIdGetDatum(object->objectId),
2126                                                                                 0, 0, 0);
2127                                 if (!HeapTupleIsValid(conTup))
2128                                         elog(ERROR, "cache lookup failed for constraint %u",
2129                                                  object->objectId);
2130                                 con = (Form_pg_constraint) GETSTRUCT(conTup);
2131
2132                                 if (OidIsValid(con->conrelid))
2133                                 {
2134                                         StringInfoData  rel;
2135
2136                                         initStringInfo(&rel);
2137                                         getRelationDescription(&rel, con->conrelid);
2138                                         appendStringInfo(&buffer, _("constraint %s on %s"),
2139                                                                          NameStr(con->conname), rel.data);
2140                                         pfree(rel.data);
2141                                 }
2142                                 else
2143                                 {
2144                                         appendStringInfo(&buffer, _("constraint %s"),
2145                                                                          NameStr(con->conname));
2146                                 }
2147
2148                                 ReleaseSysCache(conTup);
2149                                 break;
2150                         }
2151
2152                 case OCLASS_CONVERSION:
2153                         {
2154                                 HeapTuple       conTup;
2155
2156                                 conTup = SearchSysCache(CONVOID,
2157                                                                                 ObjectIdGetDatum(object->objectId),
2158                                                                                 0, 0, 0);
2159                                 if (!HeapTupleIsValid(conTup))
2160                                         elog(ERROR, "cache lookup failed for conversion %u",
2161                                                  object->objectId);
2162                                 appendStringInfo(&buffer, _("conversion %s"),
2163                                  NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname));
2164                                 ReleaseSysCache(conTup);
2165                                 break;
2166                         }
2167
2168                 case OCLASS_DEFAULT:
2169                         {
2170                                 Relation        attrdefDesc;
2171                                 ScanKeyData skey[1];
2172                                 SysScanDesc adscan;
2173                                 HeapTuple       tup;
2174                                 Form_pg_attrdef attrdef;
2175                                 ObjectAddress colobject;
2176
2177                                 attrdefDesc = heap_open(AttrDefaultRelationId, AccessShareLock);
2178
2179                                 ScanKeyInit(&skey[0],
2180                                                         ObjectIdAttributeNumber,
2181                                                         BTEqualStrategyNumber, F_OIDEQ,
2182                                                         ObjectIdGetDatum(object->objectId));
2183
2184                                 adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndexId,
2185                                                                                         true, SnapshotNow, 1, skey);
2186
2187                                 tup = systable_getnext(adscan);
2188
2189                                 if (!HeapTupleIsValid(tup))
2190                                         elog(ERROR, "could not find tuple for attrdef %u",
2191                                                  object->objectId);
2192
2193                                 attrdef = (Form_pg_attrdef) GETSTRUCT(tup);
2194
2195                                 colobject.classId = RelationRelationId;
2196                                 colobject.objectId = attrdef->adrelid;
2197                                 colobject.objectSubId = attrdef->adnum;
2198
2199                                 appendStringInfo(&buffer, _("default for %s"),
2200                                                                  getObjectDescription(&colobject));
2201
2202                                 systable_endscan(adscan);
2203                                 heap_close(attrdefDesc, AccessShareLock);
2204                                 break;
2205                         }
2206
2207                 case OCLASS_LANGUAGE:
2208                         {
2209                                 HeapTuple       langTup;
2210
2211                                 langTup = SearchSysCache(LANGOID,
2212                                                                                  ObjectIdGetDatum(object->objectId),
2213                                                                                  0, 0, 0);
2214                                 if (!HeapTupleIsValid(langTup))
2215                                         elog(ERROR, "cache lookup failed for language %u",
2216                                                  object->objectId);
2217                                 appendStringInfo(&buffer, _("language %s"),
2218                                   NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname));
2219                                 ReleaseSysCache(langTup);
2220                                 break;
2221                         }
2222
2223                 case OCLASS_OPERATOR:
2224                         appendStringInfo(&buffer, _("operator %s"),
2225                                                          format_operator(object->objectId));
2226                         break;
2227
2228                 case OCLASS_OPCLASS:
2229                         {
2230                                 HeapTuple       opcTup;
2231                                 Form_pg_opclass opcForm;
2232                                 HeapTuple       amTup;
2233                                 Form_pg_am      amForm;
2234                                 char       *nspname;
2235
2236                                 opcTup = SearchSysCache(CLAOID,
2237                                                                                 ObjectIdGetDatum(object->objectId),
2238                                                                                 0, 0, 0);
2239                                 if (!HeapTupleIsValid(opcTup))
2240                                         elog(ERROR, "cache lookup failed for opclass %u",
2241                                                  object->objectId);
2242                                 opcForm = (Form_pg_opclass) GETSTRUCT(opcTup);
2243
2244                                 amTup = SearchSysCache(AMOID,
2245                                                                            ObjectIdGetDatum(opcForm->opcmethod),
2246                                                                            0, 0, 0);
2247                                 if (!HeapTupleIsValid(amTup))
2248                                         elog(ERROR, "cache lookup failed for access method %u",
2249                                                  opcForm->opcmethod);
2250                                 amForm = (Form_pg_am) GETSTRUCT(amTup);
2251
2252                                 /* Qualify the name if not visible in search path */
2253                                 if (OpclassIsVisible(object->objectId))
2254                                         nspname = NULL;
2255                                 else
2256                                         nspname = get_namespace_name(opcForm->opcnamespace);
2257
2258                                 appendStringInfo(&buffer, _("operator class %s for access method %s"),
2259                                                                  quote_qualified_identifier(nspname,
2260                                                                                                   NameStr(opcForm->opcname)),
2261                                                                  NameStr(amForm->amname));
2262
2263                                 ReleaseSysCache(amTup);
2264                                 ReleaseSysCache(opcTup);
2265                                 break;
2266                         }
2267
2268                 case OCLASS_OPFAMILY:
2269                         getOpFamilyDescription(&buffer, object->objectId);
2270                         break;
2271
2272                 case OCLASS_AMOP:
2273                         {
2274                                 Relation        amopDesc;
2275                                 ScanKeyData skey[1];
2276                                 SysScanDesc amscan;
2277                                 HeapTuple       tup;
2278                                 Form_pg_amop amopForm;
2279                                 StringInfoData opfam;
2280
2281                                 amopDesc = heap_open(AccessMethodOperatorRelationId,
2282                                                                          AccessShareLock);
2283
2284                                 ScanKeyInit(&skey[0],
2285                                                         ObjectIdAttributeNumber,
2286                                                         BTEqualStrategyNumber, F_OIDEQ,
2287                                                         ObjectIdGetDatum(object->objectId));
2288
2289                                 amscan = systable_beginscan(amopDesc, AccessMethodOperatorOidIndexId, true,
2290                                                                                         SnapshotNow, 1, skey);
2291
2292                                 tup = systable_getnext(amscan);
2293
2294                                 if (!HeapTupleIsValid(tup))
2295                                         elog(ERROR, "could not find tuple for amop entry %u",
2296                                                  object->objectId);
2297
2298                                 amopForm = (Form_pg_amop) GETSTRUCT(tup);
2299
2300                                 initStringInfo(&opfam);
2301                                 getOpFamilyDescription(&opfam, amopForm->amopfamily);
2302                                 /*
2303                                  * translator: %d is the operator strategy (a number), the
2304                                  * first %s is the textual form of the operator, and the second
2305                                  * %s is the description of the operator family.
2306                                  */
2307                                 appendStringInfo(&buffer, _("operator %d %s of %s"),
2308                                                                  amopForm->amopstrategy,
2309                                                                  format_operator(amopForm->amopopr),
2310                                                                  opfam.data);
2311                                 pfree(opfam.data);
2312
2313                                 systable_endscan(amscan);
2314                                 heap_close(amopDesc, AccessShareLock);
2315                                 break;
2316                         }
2317
2318                 case OCLASS_AMPROC:
2319                         {
2320                                 Relation        amprocDesc;
2321                                 ScanKeyData skey[1];
2322                                 SysScanDesc amscan;
2323                                 HeapTuple       tup;
2324                                 Form_pg_amproc amprocForm;
2325                                 StringInfoData opfam;
2326
2327                                 amprocDesc = heap_open(AccessMethodProcedureRelationId,
2328                                                                            AccessShareLock);
2329
2330                                 ScanKeyInit(&skey[0],
2331                                                         ObjectIdAttributeNumber,
2332                                                         BTEqualStrategyNumber, F_OIDEQ,
2333                                                         ObjectIdGetDatum(object->objectId));
2334
2335                                 amscan = systable_beginscan(amprocDesc, AccessMethodProcedureOidIndexId, true,
2336                                                                                         SnapshotNow, 1, skey);
2337
2338                                 tup = systable_getnext(amscan);
2339
2340                                 if (!HeapTupleIsValid(tup))
2341                                         elog(ERROR, "could not find tuple for amproc entry %u",
2342                                                  object->objectId);
2343
2344                                 amprocForm = (Form_pg_amproc) GETSTRUCT(tup);
2345
2346                                 initStringInfo(&opfam);
2347                                 getOpFamilyDescription(&opfam, amprocForm->amprocfamily);
2348                                 /*
2349                                  * translator: %d is the function number, the first %s is the
2350                                  * textual form of the function with arguments, and the second
2351                                  * %s is the description of the operator family.
2352                                  */
2353                                 appendStringInfo(&buffer, _("function %d %s of %s"),
2354                                                                  amprocForm->amprocnum,
2355                                                                  format_procedure(amprocForm->amproc),
2356                                                                  opfam.data);
2357                                 pfree(opfam.data);
2358
2359                                 systable_endscan(amscan);
2360                                 heap_close(amprocDesc, AccessShareLock);
2361                                 break;
2362                         }
2363
2364                 case OCLASS_REWRITE:
2365                         {
2366                                 Relation        ruleDesc;
2367                                 ScanKeyData skey[1];
2368                                 SysScanDesc rcscan;
2369                                 HeapTuple       tup;
2370                                 Form_pg_rewrite rule;
2371
2372                                 ruleDesc = heap_open(RewriteRelationId, AccessShareLock);
2373
2374                                 ScanKeyInit(&skey[0],
2375                                                         ObjectIdAttributeNumber,
2376                                                         BTEqualStrategyNumber, F_OIDEQ,
2377                                                         ObjectIdGetDatum(object->objectId));
2378
2379                                 rcscan = systable_beginscan(ruleDesc, RewriteOidIndexId, true,
2380                                                                                         SnapshotNow, 1, skey);
2381
2382                                 tup = systable_getnext(rcscan);
2383
2384                                 if (!HeapTupleIsValid(tup))
2385                                         elog(ERROR, "could not find tuple for rule %u",
2386                                                  object->objectId);
2387
2388                                 rule = (Form_pg_rewrite) GETSTRUCT(tup);
2389
2390                                 appendStringInfo(&buffer, _("rule %s on "),
2391                                                                  NameStr(rule->rulename));
2392                                 getRelationDescription(&buffer, rule->ev_class);
2393
2394                                 systable_endscan(rcscan);
2395                                 heap_close(ruleDesc, AccessShareLock);
2396                                 break;
2397                         }
2398
2399                 case OCLASS_TRIGGER:
2400                         {
2401                                 Relation        trigDesc;
2402                                 ScanKeyData skey[1];
2403                                 SysScanDesc tgscan;
2404                                 HeapTuple       tup;
2405                                 Form_pg_trigger trig;
2406
2407                                 trigDesc = heap_open(TriggerRelationId, AccessShareLock);
2408
2409                                 ScanKeyInit(&skey[0],
2410                                                         ObjectIdAttributeNumber,
2411                                                         BTEqualStrategyNumber, F_OIDEQ,
2412                                                         ObjectIdGetDatum(object->objectId));
2413
2414                                 tgscan = systable_beginscan(trigDesc, TriggerOidIndexId, true,
2415                                                                                         SnapshotNow, 1, skey);
2416
2417                                 tup = systable_getnext(tgscan);
2418
2419                                 if (!HeapTupleIsValid(tup))
2420                                         elog(ERROR, "could not find tuple for trigger %u",
2421                                                  object->objectId);
2422
2423                                 trig = (Form_pg_trigger) GETSTRUCT(tup);
2424
2425                                 appendStringInfo(&buffer, _("trigger %s on "),
2426                                                                  NameStr(trig->tgname));
2427                                 getRelationDescription(&buffer, trig->tgrelid);
2428
2429                                 systable_endscan(tgscan);
2430                                 heap_close(trigDesc, AccessShareLock);
2431                                 break;
2432                         }
2433
2434                 case OCLASS_SCHEMA:
2435                         {
2436                                 char       *nspname;
2437
2438                                 nspname = get_namespace_name(object->objectId);
2439                                 if (!nspname)
2440                                         elog(ERROR, "cache lookup failed for namespace %u",
2441                                                  object->objectId);
2442                                 appendStringInfo(&buffer, _("schema %s"), nspname);
2443                                 break;
2444                         }
2445
2446                 case OCLASS_TSPARSER:
2447                         {
2448                                 HeapTuple       tup;
2449
2450                                 tup = SearchSysCache(TSPARSEROID,
2451                                                                          ObjectIdGetDatum(object->objectId),
2452                                                                          0, 0, 0);
2453                                 if (!HeapTupleIsValid(tup))
2454                                         elog(ERROR, "cache lookup failed for text search parser %u",
2455                                                  object->objectId);
2456                                 appendStringInfo(&buffer, _("text search parser %s"),
2457                                          NameStr(((Form_pg_ts_parser) GETSTRUCT(tup))->prsname));
2458                                 ReleaseSysCache(tup);
2459                                 break;
2460                         }
2461
2462                 case OCLASS_TSDICT:
2463                         {
2464                                 HeapTuple       tup;
2465
2466                                 tup = SearchSysCache(TSDICTOID,
2467                                                                          ObjectIdGetDatum(object->objectId),
2468                                                                          0, 0, 0);
2469                                 if (!HeapTupleIsValid(tup))
2470                                         elog(ERROR, "cache lookup failed for text search dictionary %u",
2471                                                  object->objectId);
2472                                 appendStringInfo(&buffer, _("text search dictionary %s"),
2473                                           NameStr(((Form_pg_ts_dict) GETSTRUCT(tup))->dictname));
2474                                 ReleaseSysCache(tup);
2475                                 break;
2476                         }
2477
2478                 case OCLASS_TSTEMPLATE:
2479                         {
2480                                 HeapTuple       tup;
2481
2482                                 tup = SearchSysCache(TSTEMPLATEOID,
2483                                                                          ObjectIdGetDatum(object->objectId),
2484                                                                          0, 0, 0);
2485                                 if (!HeapTupleIsValid(tup))
2486                                         elog(ERROR, "cache lookup failed for text search template %u",
2487                                                  object->objectId);
2488                                 appendStringInfo(&buffer, _("text search template %s"),
2489                                   NameStr(((Form_pg_ts_template) GETSTRUCT(tup))->tmplname));
2490                                 ReleaseSysCache(tup);
2491                                 break;
2492                         }
2493
2494                 case OCLASS_TSCONFIG:
2495                         {
2496                                 HeapTuple       tup;
2497
2498                                 tup = SearchSysCache(TSCONFIGOID,
2499                                                                          ObjectIdGetDatum(object->objectId),
2500                                                                          0, 0, 0);
2501                                 if (!HeapTupleIsValid(tup))
2502                                         elog(ERROR, "cache lookup failed for text search configuration %u",
2503                                                  object->objectId);
2504                                 appendStringInfo(&buffer, _("text search configuration %s"),
2505                                          NameStr(((Form_pg_ts_config) GETSTRUCT(tup))->cfgname));
2506                                 ReleaseSysCache(tup);
2507                                 break;
2508                         }
2509
2510                 case OCLASS_ROLE:
2511                         {
2512                                 appendStringInfo(&buffer, _("role %s"),
2513                                                                  GetUserNameFromId(object->objectId));
2514                                 break;
2515                         }
2516
2517                 case OCLASS_DATABASE:
2518                         {
2519                                 char       *datname;
2520
2521                                 datname = get_database_name(object->objectId);
2522                                 if (!datname)
2523                                         elog(ERROR, "cache lookup failed for database %u",
2524                                                  object->objectId);
2525                                 appendStringInfo(&buffer, _("database %s"), datname);
2526                                 break;
2527                         }
2528
2529                 case OCLASS_TBLSPACE:
2530                         {
2531                                 char       *tblspace;
2532
2533                                 tblspace = get_tablespace_name(object->objectId);
2534                                 if (!tblspace)
2535                                         elog(ERROR, "cache lookup failed for tablespace %u",
2536                                                  object->objectId);
2537                                 appendStringInfo(&buffer, _("tablespace %s"), tblspace);
2538                                 break;
2539                         }
2540
2541                 case OCLASS_FDW:
2542                         {
2543                                 ForeignDataWrapper *fdw;
2544
2545                                 fdw = GetForeignDataWrapper(object->objectId);
2546                                 appendStringInfo(&buffer, _("foreign-data wrapper %s"), fdw->fdwname);
2547                                 break;
2548                         }
2549
2550                 case OCLASS_FOREIGN_SERVER:
2551                         {
2552                                 ForeignServer *srv;
2553
2554                                 srv = GetForeignServer(object->objectId);
2555                                 appendStringInfo(&buffer, _("server %s"), srv->servername);
2556                                 break;
2557                         }
2558
2559                 case OCLASS_USER_MAPPING:
2560                         {
2561                                 HeapTuple               tup;
2562                                 Oid                             useid;
2563                                 char               *usename;
2564
2565                                 tup = SearchSysCache(USERMAPPINGOID,
2566                                                                          ObjectIdGetDatum(object->objectId),
2567                                                                          0, 0, 0);
2568                                 if (!HeapTupleIsValid(tup))
2569                                         elog(ERROR, "cache lookup failed for user mapping %u",
2570                                                  object->objectId);
2571
2572                                 useid = ((Form_pg_user_mapping) GETSTRUCT(tup))->umuser;
2573
2574                                 ReleaseSysCache(tup);
2575
2576                                 if (OidIsValid(useid))
2577                                         usename = GetUserNameFromId(useid);
2578                                 else
2579                                         usename = "public";
2580
2581                                 appendStringInfo(&buffer, _("user mapping for %s"), usename);
2582                                 break;
2583                         }
2584
2585                 default:
2586                         appendStringInfo(&buffer, "unrecognized object %u %u %d",
2587                                                          object->classId,
2588                                                          object->objectId,
2589                                                          object->objectSubId);
2590                         break;
2591         }
2592
2593         return buffer.data;
2594 }
2595
2596 /*
2597  * subroutine for getObjectDescription: describe a relation
2598  */
2599 static void
2600 getRelationDescription(StringInfo buffer, Oid relid)
2601 {
2602         HeapTuple       relTup;
2603         Form_pg_class relForm;
2604         char       *nspname;
2605         char       *relname;
2606
2607         relTup = SearchSysCache(RELOID,
2608                                                         ObjectIdGetDatum(relid),
2609                                                         0, 0, 0);
2610         if (!HeapTupleIsValid(relTup))
2611                 elog(ERROR, "cache lookup failed for relation %u", relid);
2612         relForm = (Form_pg_class) GETSTRUCT(relTup);
2613
2614         /* Qualify the name if not visible in search path */
2615         if (RelationIsVisible(relid))
2616                 nspname = NULL;
2617         else
2618                 nspname = get_namespace_name(relForm->relnamespace);
2619
2620         relname = quote_qualified_identifier(nspname, NameStr(relForm->relname));
2621
2622         switch (relForm->relkind)
2623         {
2624                 case RELKIND_RELATION:
2625                         appendStringInfo(buffer, _("table %s"),
2626                                                          relname);
2627                         break;
2628                 case RELKIND_INDEX:
2629                         appendStringInfo(buffer, _("index %s"),
2630                                                          relname);
2631                         break;
2632                 case RELKIND_SEQUENCE:
2633                         appendStringInfo(buffer, _("sequence %s"),
2634                                                          relname);
2635                         break;
2636                 case RELKIND_UNCATALOGED:
2637                         appendStringInfo(buffer, _("uncataloged table %s"),
2638                                                          relname);
2639                         break;
2640                 case RELKIND_TOASTVALUE:
2641                         appendStringInfo(buffer, _("toast table %s"),
2642                                                          relname);
2643                         break;
2644                 case RELKIND_VIEW:
2645                         appendStringInfo(buffer, _("view %s"),
2646                                                          relname);
2647                         break;
2648                 case RELKIND_COMPOSITE_TYPE:
2649                         appendStringInfo(buffer, _("composite type %s"),
2650                                                          relname);
2651                         break;
2652                 default:
2653                         /* shouldn't get here */
2654                         appendStringInfo(buffer, _("relation %s"),
2655                                                          relname);
2656                         break;
2657         }
2658
2659         ReleaseSysCache(relTup);
2660 }
2661
2662 /*
2663  * subroutine for getObjectDescription: describe an operator family
2664  */
2665 static void
2666 getOpFamilyDescription(StringInfo buffer, Oid opfid)
2667 {
2668         HeapTuple       opfTup;
2669         Form_pg_opfamily opfForm;
2670         HeapTuple       amTup;
2671         Form_pg_am      amForm;
2672         char       *nspname;
2673
2674         opfTup = SearchSysCache(OPFAMILYOID,
2675                                                         ObjectIdGetDatum(opfid),
2676                                                         0, 0, 0);
2677         if (!HeapTupleIsValid(opfTup))
2678                 elog(ERROR, "cache lookup failed for opfamily %u", opfid);
2679         opfForm = (Form_pg_opfamily) GETSTRUCT(opfTup);
2680
2681         amTup = SearchSysCache(AMOID,
2682                                                    ObjectIdGetDatum(opfForm->opfmethod),
2683                                                    0, 0, 0);
2684         if (!HeapTupleIsValid(amTup))
2685                 elog(ERROR, "cache lookup failed for access method %u",
2686                          opfForm->opfmethod);
2687         amForm = (Form_pg_am) GETSTRUCT(amTup);
2688
2689         /* Qualify the name if not visible in search path */
2690         if (OpfamilyIsVisible(opfid))
2691                 nspname = NULL;
2692         else
2693                 nspname = get_namespace_name(opfForm->opfnamespace);
2694
2695         appendStringInfo(buffer, _("operator family %s for access method %s"),
2696                                          quote_qualified_identifier(nspname,
2697                                                                                                 NameStr(opfForm->opfname)),
2698                                          NameStr(amForm->amname));
2699
2700         ReleaseSysCache(amTup);
2701         ReleaseSysCache(opfTup);
2702 }