X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcatalog%2Fdependency.c;h=793071b0f055757e16c3a640f12d1f04e0c5a3ef;hb=0cefb50f3ce964d6097aad64dabd9b544c3d2e68;hp=251fb82d813579daa7a953c75877f1d8af1c2b48;hpb=089003fb462fcce46c02bf47322b429f73c33c50;p=postgresql diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 251fb82d81..793071b0f0 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -4,11 +4,11 @@ * Routines to support inter-object dependencies. * * - * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.29 2003/08/04 00:43:16 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.76 2008/06/14 18:04:33 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -16,114 +16,161 @@ #include "access/genam.h" #include "access/heapam.h" -#include "catalog/catname.h" +#include "access/sysattr.h" +#include "access/xact.h" #include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/pg_amop.h" +#include "catalog/pg_amproc.h" #include "catalog/pg_attrdef.h" +#include "catalog/pg_authid.h" #include "catalog/pg_cast.h" #include "catalog/pg_constraint.h" #include "catalog/pg_conversion.h" +#include "catalog/pg_conversion_fn.h" +#include "catalog/pg_database.h" #include "catalog/pg_depend.h" #include "catalog/pg_language.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_operator.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_proc.h" #include "catalog/pg_rewrite.h" +#include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" +#include "catalog/pg_ts_config.h" +#include "catalog/pg_ts_dict.h" +#include "catalog/pg_ts_parser.h" +#include "catalog/pg_ts_template.h" +#include "catalog/pg_type.h" #include "commands/comment.h" +#include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/proclang.h" #include "commands/schemacmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "commands/typecmds.h" -#include "lib/stringinfo.h" #include "miscadmin.h" #include "optimizer/clauses.h" #include "parser/parsetree.h" #include "rewrite/rewriteRemove.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/tqual.h" -/* This enum covers all system catalogs whose OIDs can appear in classid. */ -typedef enum ObjectClasses +/* + * Deletion processing requires additional state for each ObjectAddress that + * it's planning to delete. For simplicity and code-sharing we make the + * ObjectAddresses code support arrays with or without this extra state. + */ +typedef struct { - OCLASS_CLASS, /* pg_class */ - OCLASS_PROC, /* pg_proc */ - OCLASS_TYPE, /* pg_type */ - OCLASS_CAST, /* pg_cast */ - OCLASS_CONSTRAINT, /* pg_constraint */ - OCLASS_CONVERSION, /* pg_conversion */ - OCLASS_DEFAULT, /* pg_attrdef */ - OCLASS_LANGUAGE, /* pg_language */ - OCLASS_OPERATOR, /* pg_operator */ - OCLASS_OPCLASS, /* pg_opclass */ - OCLASS_REWRITE, /* pg_rewrite */ - OCLASS_TRIGGER, /* pg_trigger */ - OCLASS_SCHEMA, /* pg_namespace */ - MAX_OCLASS /* MUST BE LAST */ -} ObjectClasses; + int flags; /* bitmask, see bit definitions below */ + ObjectAddress dependee; /* object whose deletion forced this one */ +} ObjectAddressExtra; + +/* ObjectAddressExtra flag bits */ +#define DEPFLAG_ORIGINAL 0x0001 /* an original deletion target */ +#define DEPFLAG_NORMAL 0x0002 /* reached via normal dependency */ +#define DEPFLAG_AUTO 0x0004 /* reached via auto dependency */ +#define DEPFLAG_INTERNAL 0x0008 /* reached via internal dependency */ + /* expansible list of ObjectAddresses */ -typedef struct +struct ObjectAddresses { ObjectAddress *refs; /* => palloc'd array */ + ObjectAddressExtra *extras; /* => palloc'd array, or NULL if not used */ int numrefs; /* current number of references */ - int maxrefs; /* current size of palloc'd array */ -} ObjectAddresses; + int maxrefs; /* current size of palloc'd array(s) */ +}; + +/* typedef ObjectAddresses appears in dependency.h */ + +/* threaded list of ObjectAddresses, for recursion detection */ +typedef struct ObjectAddressStack +{ + const ObjectAddress *object; /* object being visited */ + int flags; /* its current flag bits */ + struct ObjectAddressStack *next; /* next outer stack level */ +} ObjectAddressStack; /* for find_expr_references_walker */ typedef struct { - ObjectAddresses addrs; /* addresses being accumulated */ + ObjectAddresses *addrs; /* addresses being accumulated */ List *rtables; /* list of rangetables to resolve Vars */ } find_expr_references_context; - /* - * Because not all system catalogs have predetermined OIDs, we build a table - * mapping between ObjectClasses and OIDs. This is done at most once per - * backend run, to minimize lookup overhead. + * This constant table maps ObjectClasses to the corresponding catalog OIDs. + * See also getObjectClass(). */ -static bool object_classes_initialized = false; -static Oid object_classes[MAX_OCLASS]; - - -static void findAutoDeletableObjects(const ObjectAddress *object, - ObjectAddresses *oktodelete, - Relation depRel); -static bool recursiveDeletion(const ObjectAddress *object, - DropBehavior behavior, - int msglevel, - const ObjectAddress *callingObject, - ObjectAddresses *oktodelete, - Relation depRel); -static bool deleteDependentObjects(const ObjectAddress *object, - const char *objDescription, +static const Oid object_classes[MAX_OCLASS] = { + RelationRelationId, /* OCLASS_CLASS */ + ProcedureRelationId, /* OCLASS_PROC */ + TypeRelationId, /* OCLASS_TYPE */ + CastRelationId, /* OCLASS_CAST */ + ConstraintRelationId, /* OCLASS_CONSTRAINT */ + ConversionRelationId, /* OCLASS_CONVERSION */ + AttrDefaultRelationId, /* OCLASS_DEFAULT */ + LanguageRelationId, /* OCLASS_LANGUAGE */ + OperatorRelationId, /* OCLASS_OPERATOR */ + OperatorClassRelationId, /* OCLASS_OPCLASS */ + OperatorFamilyRelationId, /* OCLASS_OPFAMILY */ + AccessMethodOperatorRelationId, /* OCLASS_AMOP */ + AccessMethodProcedureRelationId, /* OCLASS_AMPROC */ + RewriteRelationId, /* OCLASS_REWRITE */ + TriggerRelationId, /* OCLASS_TRIGGER */ + NamespaceRelationId, /* OCLASS_SCHEMA */ + TSParserRelationId, /* OCLASS_TSPARSER */ + TSDictionaryRelationId, /* OCLASS_TSDICT */ + TSTemplateRelationId, /* OCLASS_TSTEMPLATE */ + TSConfigRelationId, /* OCLASS_TSCONFIG */ + AuthIdRelationId, /* OCLASS_ROLE */ + DatabaseRelationId, /* OCLASS_DATABASE */ + TableSpaceRelationId /* OCLASS_TBLSPACE */ +}; + + +static void findDependentObjects(const ObjectAddress *object, + int flags, + ObjectAddressStack *stack, + ObjectAddresses *targetObjects, + const ObjectAddresses *pendingObjects, + Relation depRel); +static void reportDependentObjects(const ObjectAddresses *targetObjects, DropBehavior behavior, int msglevel, - ObjectAddresses *oktodelete, - Relation depRel); + const ObjectAddress *origObject); +static void deleteOneObject(const ObjectAddress *object, Relation depRel); static void doDeletion(const ObjectAddress *object); +static void AcquireDeletionLock(const ObjectAddress *object); +static void ReleaseDeletionLock(const ObjectAddress *object); static bool find_expr_references_walker(Node *node, find_expr_references_context *context); static void eliminate_duplicate_dependencies(ObjectAddresses *addrs); static int object_address_comparator(const void *a, const void *b); -static void init_object_addresses(ObjectAddresses *addrs); -static void add_object_address(ObjectClasses oclass, Oid objectId, int32 subId, +static void add_object_address(ObjectClass oclass, Oid objectId, int32 subId, ObjectAddresses *addrs); -static void add_exact_object_address(const ObjectAddress *object, - ObjectAddresses *addrs); -static bool object_address_present(const ObjectAddress *object, - ObjectAddresses *addrs); -static void term_object_addresses(ObjectAddresses *addrs); -static void init_object_classes(void); -static ObjectClasses getObjectClass(const ObjectAddress *object); -static char *getObjectDescription(const ObjectAddress *object); +static void add_exact_object_address_extra(const ObjectAddress *object, + const ObjectAddressExtra *extra, + ObjectAddresses *addrs); +static bool object_address_present_add_flags(const ObjectAddress *object, + int flags, + ObjectAddresses *addrs); static void getRelationDescription(StringInfo buffer, Oid relid); +static void getOpFamilyDescription(StringInfo buffer, Oid opfid); /* @@ -134,303 +181,343 @@ static void getRelationDescription(StringInfo buffer, Oid relid); * according to the dependency type. * * This is the outer control routine for all forms of DROP that drop objects - * that can participate in dependencies. + * that can participate in dependencies. Note that the next two routines + * are variants on the same theme; if you change anything here you'll likely + * need to fix them too. */ void performDeletion(const ObjectAddress *object, DropBehavior behavior) { - char *objDescription; Relation depRel; - ObjectAddresses oktodelete; + ObjectAddresses *targetObjects; + int i; /* - * Get object description for possible use in failure message. Must do - * this before deleting it ... + * We save some cycles by opening pg_depend just once and passing the + * Relation pointer down to all the recursive deletion steps. */ - objDescription = getObjectDescription(object); + depRel = heap_open(DependRelationId, RowExclusiveLock); /* - * We save some cycles by opening pg_depend just once and passing the - * Relation pointer down to all the recursive deletion steps. + * Acquire deletion lock on the target object. (Ideally the caller has + * done this already, but many places are sloppy about it.) */ - depRel = heap_openr(DependRelationName, RowExclusiveLock); + AcquireDeletionLock(object); /* - * Construct a list of objects that are reachable by AUTO or INTERNAL - * dependencies from the target object. These should be deleted - * silently, even if the actual deletion pass first reaches one of - * them via a non-auto dependency. + * Construct a list of objects to delete (ie, the given object plus + * everything directly or indirectly dependent on it). */ - init_object_addresses(&oktodelete); + targetObjects = new_object_addresses(); - findAutoDeletableObjects(object, &oktodelete, depRel); + findDependentObjects(object, + DEPFLAG_ORIGINAL, + NULL, /* empty stack */ + targetObjects, + NULL, /* no pendingObjects */ + depRel); - if (!recursiveDeletion(object, behavior, NOTICE, - NULL, &oktodelete, depRel)) - ereport(ERROR, - (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), - errmsg("cannot drop %s because other objects depend on it", - objDescription), - errhint("Use DROP ... CASCADE to drop the dependent objects too."))); + /* + * Check if deletion is allowed, and report about cascaded deletes. + */ + reportDependentObjects(targetObjects, + behavior, + NOTICE, + object); - term_object_addresses(&oktodelete); + /* + * Delete all the objects in the proper order. + */ + for (i = 0; i < targetObjects->numrefs; i++) + { + ObjectAddress *thisobj = targetObjects->refs + i; - heap_close(depRel, RowExclusiveLock); + deleteOneObject(thisobj, depRel); + } - pfree(objDescription); -} + /* And clean up */ + free_object_addresses(targetObjects); + heap_close(depRel, RowExclusiveLock); +} /* - * deleteWhatDependsOn: attempt to drop everything that depends on the - * specified object, though not the object itself. Behavior is always - * CASCADE. + * performMultipleDeletions: Similar to performDeletion, but act on multiple + * objects at once. * - * This is currently used only to clean out the contents of a schema - * (namespace): the passed object is a namespace. We normally want this - * to be done silently, so there's an option to suppress NOTICE messages. + * The main difference from issuing multiple performDeletion calls is that the + * list of objects that would be implicitly dropped, for each object to be + * dropped, is the union of the implicit-object list for all objects. This + * makes each check be more relaxed. */ void -deleteWhatDependsOn(const ObjectAddress *object, - bool showNotices) +performMultipleDeletions(const ObjectAddresses *objects, + DropBehavior behavior) { - char *objDescription; Relation depRel; - ObjectAddresses oktodelete; + ObjectAddresses *targetObjects; + int i; - /* - * Get object description for possible use in failure messages - */ - objDescription = getObjectDescription(object); + /* No work if no objects... */ + if (objects->numrefs <= 0) + return; /* * We save some cycles by opening pg_depend just once and passing the * Relation pointer down to all the recursive deletion steps. */ - depRel = heap_openr(DependRelationName, RowExclusiveLock); + depRel = heap_open(DependRelationId, RowExclusiveLock); /* - * Construct a list of objects that are reachable by AUTO or INTERNAL - * dependencies from the target object. These should be deleted - * silently, even if the actual deletion pass first reaches one of - * them via a non-auto dependency. + * Construct a list of objects to delete (ie, the given objects plus + * everything directly or indirectly dependent on them). Note that + * because we pass the whole objects list as pendingObjects context, + * we won't get a failure from trying to delete an object that is + * internally dependent on another one in the list; we'll just skip + * that object and delete it when we reach its owner. */ - init_object_addresses(&oktodelete); + targetObjects = new_object_addresses(); - findAutoDeletableObjects(object, &oktodelete, depRel); + for (i = 0; i < objects->numrefs; i++) + { + const ObjectAddress *thisobj = objects->refs + i; + + /* + * Acquire deletion lock on each target object. (Ideally the caller + * has done this already, but many places are sloppy about it.) + */ + AcquireDeletionLock(thisobj); + + findDependentObjects(thisobj, + DEPFLAG_ORIGINAL, + NULL, /* empty stack */ + targetObjects, + objects, + depRel); + } /* - * Now invoke only step 2 of recursiveDeletion: just recurse to the - * stuff dependent on the given object. + * Check if deletion is allowed, and report about cascaded deletes. + * + * If there's exactly one object being deleted, report it the same + * way as in performDeletion(), else we have to be vaguer. */ - if (!deleteDependentObjects(object, objDescription, - DROP_CASCADE, - showNotices ? NOTICE : DEBUG2, - &oktodelete, depRel)) - ereport(ERROR, - (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), - errmsg("failed to drop all objects depending on %s", - objDescription))); + reportDependentObjects(targetObjects, + behavior, + NOTICE, + (objects->numrefs == 1 ? objects->refs : NULL)); /* - * We do not need CommandCounterIncrement here, since if step 2 did - * anything then each recursive call will have ended with one. + * Delete all the objects in the proper order. */ + for (i = 0; i < targetObjects->numrefs; i++) + { + ObjectAddress *thisobj = targetObjects->refs + i; + + deleteOneObject(thisobj, depRel); + } - term_object_addresses(&oktodelete); + /* And clean up */ + free_object_addresses(targetObjects); heap_close(depRel, RowExclusiveLock); - - pfree(objDescription); } - /* - * findAutoDeletableObjects: find all objects that are reachable by AUTO or - * INTERNAL dependency paths from the given object. Add them all to the - * oktodelete list. Note that the originally given object will also be - * added to the list. + * deleteWhatDependsOn: attempt to drop everything that depends on the + * specified object, though not the object itself. Behavior is always + * CASCADE. * - * depRel is the already-open pg_depend relation. + * This is currently used only to clean out the contents of a schema + * (namespace): the passed object is a namespace. We normally want this + * to be done silently, so there's an option to suppress NOTICE messages. */ -static void -findAutoDeletableObjects(const ObjectAddress *object, - ObjectAddresses *oktodelete, - Relation depRel) +void +deleteWhatDependsOn(const ObjectAddress *object, + bool showNotices) { - ScanKeyData key[3]; - int nkeys; - SysScanDesc scan; - HeapTuple tup; - ObjectAddress otherObject; + Relation depRel; + ObjectAddresses *targetObjects; + int i; /* - * If this object is already in oktodelete, then we already visited - * it; don't do so again (this prevents infinite recursion if there's - * a loop in pg_depend). Otherwise, add it. + * We save some cycles by opening pg_depend just once and passing the + * Relation pointer down to all the recursive deletion steps. */ - if (object_address_present(object, oktodelete)) - return; - add_exact_object_address(object, oktodelete); + depRel = heap_open(DependRelationId, RowExclusiveLock); /* - * Scan pg_depend records that link to this object, showing the things - * that depend on it. For each one that is AUTO or INTERNAL, visit - * the referencing object. - * - * When dropping a whole object (subId = 0), find pg_depend records for - * its sub-objects too. + * Acquire deletion lock on the target object. (Ideally the caller has + * done this already, but many places are sloppy about it.) */ - ScanKeyEntryInitialize(&key[0], 0x0, - Anum_pg_depend_refclassid, F_OIDEQ, - ObjectIdGetDatum(object->classId)); - ScanKeyEntryInitialize(&key[1], 0x0, - Anum_pg_depend_refobjid, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); - if (object->objectSubId != 0) - { - ScanKeyEntryInitialize(&key[2], 0x0, - Anum_pg_depend_refobjsubid, F_INT4EQ, - Int32GetDatum(object->objectSubId)); - nkeys = 3; - } - else - nkeys = 2; + AcquireDeletionLock(object); - scan = systable_beginscan(depRel, DependReferenceIndex, true, - SnapshotNow, nkeys, key); + /* + * Construct a list of objects to delete (ie, the given object plus + * everything directly or indirectly dependent on it). + */ + targetObjects = new_object_addresses(); - while (HeapTupleIsValid(tup = systable_getnext(scan))) + findDependentObjects(object, + DEPFLAG_ORIGINAL, + NULL, /* empty stack */ + targetObjects, + NULL, /* no pendingObjects */ + depRel); + + /* + * Check if deletion is allowed, and report about cascaded deletes. + */ + reportDependentObjects(targetObjects, + DROP_CASCADE, + showNotices ? NOTICE : DEBUG2, + object); + + /* + * Delete all the objects in the proper order, except we skip the original + * object. + */ + for (i = 0; i < targetObjects->numrefs; i++) { - Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + ObjectAddress *thisobj = targetObjects->refs + i; + ObjectAddressExtra *thisextra = targetObjects->extras + i; - switch (foundDep->deptype) - { - case DEPENDENCY_NORMAL: - /* ignore */ - break; - case DEPENDENCY_AUTO: - case DEPENDENCY_INTERNAL: - /* recurse */ - otherObject.classId = foundDep->classid; - otherObject.objectId = foundDep->objid; - otherObject.objectSubId = foundDep->objsubid; - findAutoDeletableObjects(&otherObject, oktodelete, depRel); - break; - case DEPENDENCY_PIN: + if (thisextra->flags & DEPFLAG_ORIGINAL) + continue; - /* - * For a PIN dependency we just ereport immediately; there - * won't be any others to examine, and we aren't ever - * going to let the user delete it. - */ - ereport(ERROR, - (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), - errmsg("cannot drop %s because it is required by the database system", - getObjectDescription(object)))); - break; - default: - elog(ERROR, "unrecognized dependency type '%c' for %s", - foundDep->deptype, getObjectDescription(object)); - break; - } + deleteOneObject(thisobj, depRel); } - systable_endscan(scan); -} + /* And clean up */ + free_object_addresses(targetObjects); + heap_close(depRel, RowExclusiveLock); +} /* - * recursiveDeletion: delete a single object for performDeletion, plus - * (recursively) anything that depends on it. - * - * Returns TRUE if successful, FALSE if not. - * - * callingObject is NULL at the outer level, else identifies the object that - * we recursed from (the reference object that someone else needs to delete). + * findDependentObjects - find all objects that depend on 'object' * - * oktodelete is a list of objects verified deletable (ie, reachable by one - * or more AUTO or INTERNAL dependencies from the original target). - * - * depRel is the already-open pg_depend relation. + * For every object that depends on the starting object, acquire a deletion + * lock on the object, add it to targetObjects (if not already there), + * and recursively find objects that depend on it. An object's dependencies + * will be placed into targetObjects before the object itself; this means + * that the finished list's order represents a safe deletion order. * + * The caller must already have a deletion lock on 'object' itself, + * but must not have added it to targetObjects. (Note: there are corner + * cases where we won't add the object either, and will also release the + * caller-taken lock. This is a bit ugly, but the API is set up this way + * to allow easy rechecking of an object's liveness after we lock it. See + * notes within the function.) * - * In RESTRICT mode, we perform all the deletions anyway, but ereport a message - * and return FALSE if we find a restriction violation. performDeletion - * will then abort the transaction to nullify the deletions. We have to - * do it this way to (a) report all the direct and indirect dependencies - * while (b) not going into infinite recursion if there's a cycle. - * - * This is even more complex than one could wish, because it is possible for - * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL - * dependencies. Also, we might have a situation where we've been asked to - * delete object A, and objects B and C both have AUTO dependencies on A, - * but B also has a NORMAL dependency on C. (Since any of these paths might - * be indirect, we can't prevent these scenarios, but must cope instead.) - * If we visit C before B then we would mistakenly decide that the B->C link - * should prevent the restricted drop from occurring. To handle this, we make - * a pre-scan to find all the objects that are auto-deletable from A. If we - * visit C first, but B is present in the oktodelete list, then we make no - * complaint but recurse to delete B anyway. (Note that in general we must - * delete B before deleting C; the drop routine for B may try to access C.) + * When dropping a whole object (subId = 0), we find dependencies for + * its sub-objects too. * - * Note: in the case where the path to B is traversed first, we will not - * see the NORMAL dependency when we reach C, because of the pg_depend - * removals done in step 1. The oktodelete list is necessary just - * to make the behavior independent of the order in which pg_depend - * entries are visited. + * object: the object to add to targetObjects and find dependencies on + * flags: flags to be ORed into the object's targetObjects entry + * stack: list of objects being visited in current recursion; topmost item + * is the object that we recursed from (NULL for external callers) + * targetObjects: list of objects that are scheduled to be deleted + * pendingObjects: list of other objects slated for destruction, but + * not necessarily in targetObjects yet (can be NULL if none) + * depRel: already opened pg_depend relation */ -static bool -recursiveDeletion(const ObjectAddress *object, - DropBehavior behavior, - int msglevel, - const ObjectAddress *callingObject, - ObjectAddresses *oktodelete, - Relation depRel) +static void +findDependentObjects(const ObjectAddress *object, + int flags, + ObjectAddressStack *stack, + ObjectAddresses *targetObjects, + const ObjectAddresses *pendingObjects, + Relation depRel) { - bool ok = true; - char *objDescription; ScanKeyData key[3]; int nkeys; SysScanDesc scan; HeapTuple tup; ObjectAddress otherObject; - ObjectAddress owningObject; - bool amOwned = false; + ObjectAddressStack mystack; + ObjectAddressExtra extra; + ObjectAddressStack *stackptr; /* - * Get object description for possible use in messages. Must do this - * before deleting it ... + * If the target object is already being visited in an outer recursion + * level, just report the current flags back to that level and exit. + * This is needed to avoid infinite recursion in the face of circular + * dependencies. + * + * The stack check alone would result in dependency loops being broken at + * an arbitrary point, ie, the first member object of the loop to be + * visited is the last one to be deleted. This is obviously unworkable. + * However, the check for internal dependency below guarantees that we + * will not break a loop at an internal dependency: if we enter the loop + * at an "owned" object we will switch and start at the "owning" object + * instead. We could probably hack something up to avoid breaking at an + * auto dependency, too, if we had to. However there are no known cases + * where that would be necessary. */ - objDescription = getObjectDescription(object); + for (stackptr = stack; stackptr; stackptr = stackptr->next) + { + if (object->classId == stackptr->object->classId && + object->objectId == stackptr->object->objectId) + { + if (object->objectSubId == stackptr->object->objectSubId) + { + stackptr->flags |= flags; + return; + } + /* + * Could visit column with whole table already on stack; this is + * the same case noted in object_address_present_add_flags(). + * (It's not clear this can really happen, but we might as well + * check.) + */ + if (stackptr->object->objectSubId == 0) + return; + } + } /* - * Step 1: find and remove pg_depend records that link from this - * object to others. We have to do this anyway, and doing it first - * ensures that we avoid infinite recursion in the case of cycles. - * Also, some dependency types require extra processing here. + * It's also possible that the target object has already been completely + * processed and put into targetObjects. If so, again we just add the + * specified flags to its entry and return. * - * When dropping a whole object (subId = 0), remove all pg_depend records - * for its sub-objects too. + * (Note: in these early-exit cases we could release the caller-taken + * lock, since the object is presumably now locked multiple times; + * but it seems not worth the cycles.) + */ + if (object_address_present_add_flags(object, flags, targetObjects)) + return; + + /* + * The target object might be internally dependent on some other object + * (its "owner"). If so, and if we aren't recursing from the owning + * object, we have to transform this deletion request into a deletion + * request of the owning object. (We'll eventually recurse back to this + * object, but the owning object has to be visited first so it will be + * deleted after.) The way to find out about this is to scan the + * pg_depend entries that show what this object depends on. */ - ScanKeyEntryInitialize(&key[0], 0x0, - Anum_pg_depend_classid, F_OIDEQ, - ObjectIdGetDatum(object->classId)); - ScanKeyEntryInitialize(&key[1], 0x0, - Anum_pg_depend_objid, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->classId)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); if (object->objectSubId != 0) { - ScanKeyEntryInitialize(&key[2], 0x0, - Anum_pg_depend_objsubid, F_INT4EQ, - Int32GetDatum(object->objectSubId)); + ScanKeyInit(&key[2], + Anum_pg_depend_objsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(object->objectSubId)); nkeys = 3; } else nkeys = 2; - scan = systable_beginscan(depRel, DependDependerIndex, true, + scan = systable_beginscan(depRel, DependDependerIndexId, true, SnapshotNow, nkeys, key); while (HeapTupleIsValid(tup = systable_getnext(scan))) @@ -453,295 +540,467 @@ recursiveDeletion(const ObjectAddress *object, * This object is part of the internal implementation of * another object. We have three cases: * - * 1. At the outermost recursion level, disallow the DROP. - * (We just ereport here, rather than proceeding, since no - * other dependencies are likely to be interesting.) + * 1. At the outermost recursion level, disallow the DROP. (We + * just ereport here, rather than proceeding, since no other + * dependencies are likely to be interesting.) However, if + * the other object is listed in pendingObjects, just release + * the caller's lock and return; we'll eventually complete + * the DROP when we reach that entry in the pending list. */ - if (callingObject == NULL) + if (stack == NULL) { - char *otherObjDesc = getObjectDescription(&otherObject); - + char *otherObjDesc; + + if (object_address_present(&otherObject, pendingObjects)) + { + systable_endscan(scan); + /* need to release caller's lock; see notes below */ + ReleaseDeletionLock(object); + return; + } + otherObjDesc = getObjectDescription(&otherObject); ereport(ERROR, - (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), - errmsg("cannot drop %s because %s requires it", - objDescription, otherObjDesc), - errhint("You may drop %s instead.", - otherObjDesc))); + (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), + errmsg("cannot drop %s because %s requires it", + getObjectDescription(object), + otherObjDesc), + errhint("You can drop %s instead.", + otherObjDesc))); } /* - * 2. When recursing from the other end of this - * dependency, it's okay to continue with the deletion. - * This holds when recursing from a whole object that - * includes the nominal other end as a component, too. + * 2. When recursing from the other end of this dependency, + * it's okay to continue with the deletion. This holds when + * recursing from a whole object that includes the nominal + * other end as a component, too. */ - if (callingObject->classId == otherObject.classId && - callingObject->objectId == otherObject.objectId && - (callingObject->objectSubId == otherObject.objectSubId || - callingObject->objectSubId == 0)) + if (stack->object->classId == otherObject.classId && + stack->object->objectId == otherObject.objectId && + (stack->object->objectSubId == otherObject.objectSubId || + stack->object->objectSubId == 0)) break; /* * 3. When recursing from anyplace else, transform this * deletion request into a delete of the other object. - * (This will be an error condition iff RESTRICT mode.) In - * this case we finish deleting my dependencies except for - * the INTERNAL link, which will be needed to cause the - * owning object to recurse back to me. + * + * First, release caller's lock on this object and get + * deletion lock on the other object. (We must release + * caller's lock to avoid deadlock against a concurrent + * deletion of the other object.) */ - if (amOwned) /* shouldn't happen */ - elog(ERROR, "multiple INTERNAL dependencies for %s", - objDescription); - owningObject = otherObject; - amOwned = true; - /* "continue" bypasses the simple_heap_delete call below */ - continue; + ReleaseDeletionLock(object); + AcquireDeletionLock(&otherObject); + + /* + * The other object might have been deleted while we waited + * to lock it; if so, neither it nor the current object are + * interesting anymore. We test this by checking the + * pg_depend entry (see notes below). + */ + if (!systable_recheck_tuple(scan, tup)) + { + systable_endscan(scan); + ReleaseDeletionLock(&otherObject); + return; + } + + /* + * Okay, recurse to the other object instead of proceeding. + * We treat this exactly as if the original reference had + * linked to that object instead of this one; hence, pass + * through the same flags and stack. + */ + findDependentObjects(&otherObject, + flags, + stack, + targetObjects, + pendingObjects, + depRel); + /* And we're done here. */ + systable_endscan(scan); + return; case DEPENDENCY_PIN: /* - * Should not happen; PIN dependencies should have zeroes - * in the depender fields... + * Should not happen; PIN dependencies should have zeroes in + * the depender fields... */ elog(ERROR, "incorrect use of PIN dependency with %s", - objDescription); + getObjectDescription(object)); break; default: elog(ERROR, "unrecognized dependency type '%c' for %s", - foundDep->deptype, objDescription); + foundDep->deptype, getObjectDescription(object)); break; } - - simple_heap_delete(depRel, &tup->t_self); } systable_endscan(scan); /* - * CommandCounterIncrement here to ensure that preceding changes are - * all visible; in particular, that the above deletions of pg_depend - * entries are visible. That prevents infinite recursion in case of a - * dependency loop (which is perfectly legal). + * Now recurse to any dependent objects. We must visit them first + * since they have to be deleted before the current object. */ - CommandCounterIncrement(); + mystack.object = object; /* set up a new stack level */ + mystack.flags = flags; + mystack.next = stack; + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->classId)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); + if (object->objectSubId != 0) + { + ScanKeyInit(&key[2], + Anum_pg_depend_refobjsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(object->objectSubId)); + nkeys = 3; + } + else + nkeys = 2; - /* - * If we found we are owned by another object, ask it to delete itself - * instead of proceeding. Complain if RESTRICT mode, unless the other - * object is in oktodelete. - */ - if (amOwned) + scan = systable_beginscan(depRel, DependReferenceIndexId, true, + SnapshotNow, nkeys, key); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) { - if (object_address_present(&owningObject, oktodelete)) - ereport(DEBUG2, - (errmsg("drop auto-cascades to %s", - getObjectDescription(&owningObject)))); - else if (behavior == DROP_RESTRICT) + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + int subflags; + + otherObject.classId = foundDep->classid; + otherObject.objectId = foundDep->objid; + otherObject.objectSubId = foundDep->objsubid; + + /* + * Must lock the dependent object before recursing to it. + */ + AcquireDeletionLock(&otherObject); + + /* + * The dependent object might have been deleted while we waited + * to lock it; if so, we don't need to do anything more with it. + * We can test this cheaply and independently of the object's type + * by seeing if the pg_depend tuple we are looking at is still live. + * (If the object got deleted, the tuple would have been deleted too.) + */ + if (!systable_recheck_tuple(scan, tup)) { - ereport(msglevel, - (errmsg("%s depends on %s", - getObjectDescription(&owningObject), - objDescription))); - ok = false; + /* release the now-useless lock */ + ReleaseDeletionLock(&otherObject); + /* and continue scanning for dependencies */ + continue; } - else - ereport(msglevel, - (errmsg("drop cascades to %s", - getObjectDescription(&owningObject)))); - if (!recursiveDeletion(&owningObject, behavior, msglevel, - object, oktodelete, depRel)) - ok = false; + /* Recurse, passing flags indicating the dependency type */ + switch (foundDep->deptype) + { + case DEPENDENCY_NORMAL: + subflags = DEPFLAG_NORMAL; + break; + case DEPENDENCY_AUTO: + subflags = DEPFLAG_AUTO; + break; + case DEPENDENCY_INTERNAL: + subflags = DEPFLAG_INTERNAL; + break; + case DEPENDENCY_PIN: - pfree(objDescription); + /* + * For a PIN dependency we just ereport immediately; there + * won't be any others to report. + */ + ereport(ERROR, + (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), + errmsg("cannot drop %s because it is required by the database system", + getObjectDescription(object)))); + subflags = 0; /* keep compiler quiet */ + break; + default: + elog(ERROR, "unrecognized dependency type '%c' for %s", + foundDep->deptype, getObjectDescription(object)); + subflags = 0; /* keep compiler quiet */ + break; + } - return ok; + findDependentObjects(&otherObject, + subflags, + &mystack, + targetObjects, + pendingObjects, + depRel); } - /* - * Step 2: scan pg_depend records that link to this object, showing - * the things that depend on it. Recursively delete those things. - * Note it's important to delete the dependent objects before the - * referenced one, since the deletion routines might do things like - * try to update the pg_class record when deleting a check constraint. - */ - if (!deleteDependentObjects(object, objDescription, - behavior, msglevel, - oktodelete, depRel)) - ok = false; + systable_endscan(scan); /* - * We do not need CommandCounterIncrement here, since if step 2 did - * anything then each recursive call will have ended with one. + * Finally, we can add the target object to targetObjects. Be careful + * to include any flags that were passed back down to us from inner + * recursion levels. */ + extra.flags = mystack.flags; + if (stack) + extra.dependee = *stack->object; + else + memset(&extra.dependee, 0, sizeof(extra.dependee)); + add_exact_object_address_extra(object, &extra, targetObjects); +} - /* - * Step 3: delete the object itself. - */ - doDeletion(object); +/* + * reportDependentObjects - report about dependencies, and fail if RESTRICT + * + * Tell the user about dependent objects that we are going to delete + * (or would need to delete, but are prevented by RESTRICT mode); + * then error out if there are any and it's not CASCADE mode. + * + * targetObjects: list of objects that are scheduled to be deleted + * behavior: RESTRICT or CASCADE + * msglevel: elog level for non-error report messages + * origObject: base object of deletion, or NULL if not available + * (the latter case occurs in DROP OWNED) + */ +static void +reportDependentObjects(const ObjectAddresses *targetObjects, + DropBehavior behavior, + int msglevel, + const ObjectAddress *origObject) +{ + bool ok = true; + StringInfoData clientdetail; + StringInfoData logdetail; + int numReportedClient = 0; + int numNotReportedClient = 0; + int i; /* - * Delete any comments associated with this object. (This is a - * convenient place to do it instead of having every object type know - * to do it.) + * If no error is to be thrown, and the msglevel is too low to be shown + * to either client or server log, there's no need to do any of the work. + * + * Note: this code doesn't know all there is to be known about elog + * levels, but it works for NOTICE and DEBUG2, which are the only values + * msglevel can currently have. We also assume we are running in a normal + * operating environment. */ - DeleteComments(object->objectId, object->classId, object->objectSubId); + if (behavior == DROP_CASCADE && + msglevel < client_min_messages && + (msglevel < log_min_messages || log_min_messages == LOG)) + return; /* - * CommandCounterIncrement here to ensure that preceding changes are - * all visible. + * We limit the number of dependencies reported to the client to + * MAX_REPORTED_DEPS, since client software may not deal well with + * enormous error strings. The server log always gets a full report. */ - CommandCounterIncrement(); +#define MAX_REPORTED_DEPS 100 + + initStringInfo(&clientdetail); + initStringInfo(&logdetail); /* - * And we're done! + * We process the list back to front (ie, in dependency order not deletion + * order), since this makes for a more understandable display. */ - pfree(objDescription); + for (i = targetObjects->numrefs - 1; i >= 0; i--) + { + const ObjectAddress *obj = &targetObjects->refs[i]; + const ObjectAddressExtra *extra = &targetObjects->extras[i]; + char *objDesc; - return ok; -} + /* Ignore the original deletion target(s) */ + if (extra->flags & DEPFLAG_ORIGINAL) + continue; + objDesc = getObjectDescription(obj); + + /* + * If, at any stage of the recursive search, we reached the object + * via an AUTO or INTERNAL dependency, then it's okay to delete it + * even in RESTRICT mode. + */ + if (extra->flags & (DEPFLAG_AUTO | DEPFLAG_INTERNAL)) + { + /* + * auto-cascades are reported at DEBUG2, not msglevel. We + * don't try to combine them with the regular message because + * the results are too confusing when client_min_messages and + * log_min_messages are different. + */ + ereport(DEBUG2, + (errmsg("drop auto-cascades to %s", + objDesc))); + } + else if (behavior == DROP_RESTRICT) + { + char *otherDesc = getObjectDescription(&extra->dependee); + + if (numReportedClient < MAX_REPORTED_DEPS) + { + /* separate entries with a newline */ + if (clientdetail.len != 0) + appendStringInfoChar(&clientdetail, '\n'); + appendStringInfo(&clientdetail, _("%s depends on %s"), + objDesc, otherDesc); + numReportedClient++; + } + else + numNotReportedClient++; + /* separate entries with a newline */ + if (logdetail.len != 0) + appendStringInfoChar(&logdetail, '\n'); + appendStringInfo(&logdetail, _("%s depends on %s"), + objDesc, otherDesc); + pfree(otherDesc); + ok = false; + } + else + { + if (numReportedClient < MAX_REPORTED_DEPS) + { + /* separate entries with a newline */ + if (clientdetail.len != 0) + appendStringInfoChar(&clientdetail, '\n'); + appendStringInfo(&clientdetail, _("drop cascades to %s"), + objDesc); + numReportedClient++; + } + else + numNotReportedClient++; + /* separate entries with a newline */ + if (logdetail.len != 0) + appendStringInfoChar(&logdetail, '\n'); + appendStringInfo(&logdetail, _("drop cascades to %s"), + objDesc); + } + + pfree(objDesc); + } + + if (numNotReportedClient > 0) + appendStringInfo(&clientdetail, _("\nand %d other objects " + "(see server log for list)"), + numNotReportedClient); + + if (!ok) + { + if (origObject) + ereport(ERROR, + (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), + errmsg("cannot drop %s because other objects depend on it", + getObjectDescription(origObject)), + errdetail("%s", clientdetail.data), + errdetail_log("%s", logdetail.data), + errhint("Use DROP ... CASCADE to drop the dependent objects too."))); + else + ereport(ERROR, + (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), + errmsg("cannot drop desired object(s) because other objects depend on them"), + errdetail("%s", clientdetail.data), + errdetail_log("%s", logdetail.data), + errhint("Use DROP ... CASCADE to drop the dependent objects too."))); + } + else if (numReportedClient > 1) + { + ereport(msglevel, + /* translator: %d always has a value larger than 1 */ + (errmsg("drop cascades to %d other objects", + numReportedClient + numNotReportedClient), + errdetail("%s", clientdetail.data), + errdetail_log("%s", logdetail.data))); + } + else if (numReportedClient == 1) + { + /* we just use the single item as-is */ + ereport(msglevel, + (errmsg_internal("%s", clientdetail.data))); + } + + pfree(clientdetail.data); + pfree(logdetail.data); +} /* - * deleteDependentObjects - find and delete objects that depend on 'object' - * - * Scan pg_depend records that link to the given object, showing - * the things that depend on it. Recursively delete those things. (We - * don't delete the pg_depend records here, as the recursive call will - * do that.) Note it's important to delete the dependent objects - * before the referenced one, since the deletion routines might do - * things like try to update the pg_class record when deleting a check - * constraint. - * - * When dropping a whole object (subId = 0), find pg_depend records for - * its sub-objects too. - * - * object: the object to find dependencies on - * objDescription: description of object (only used for error messages) - * behavior: desired drop behavior - * oktodelete: stuff that's AUTO-deletable - * depRel: already opened pg_depend relation - * - * Returns TRUE if all is well, false if any problem found. + * deleteOneObject: delete a single object for performDeletion. * - * NOTE: because we are using SnapshotNow, if a recursive call deletes - * any pg_depend tuples that our scan hasn't yet visited, we will not - * see them as good when we do visit them. This is essential for - * correct behavior if there are multiple dependency paths between two - * objects --- else we might try to delete an already-deleted object. + * depRel is the already-open pg_depend relation. */ -static bool -deleteDependentObjects(const ObjectAddress *object, - const char *objDescription, - DropBehavior behavior, - int msglevel, - ObjectAddresses *oktodelete, - Relation depRel) +static void +deleteOneObject(const ObjectAddress *object, Relation depRel) { - bool ok = true; ScanKeyData key[3]; int nkeys; SysScanDesc scan; HeapTuple tup; - ObjectAddress otherObject; - ScanKeyEntryInitialize(&key[0], 0x0, - Anum_pg_depend_refclassid, F_OIDEQ, - ObjectIdGetDatum(object->classId)); - ScanKeyEntryInitialize(&key[1], 0x0, - Anum_pg_depend_refobjid, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); + /* + * First remove any pg_depend records that link from this object to + * others. (Any records linking to this object should be gone already.) + * + * When dropping a whole object (subId = 0), remove all pg_depend records + * for its sub-objects too. + */ + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->classId)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); if (object->objectSubId != 0) { - ScanKeyEntryInitialize(&key[2], 0x0, - Anum_pg_depend_refobjsubid, F_INT4EQ, - Int32GetDatum(object->objectSubId)); + ScanKeyInit(&key[2], + Anum_pg_depend_objsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum(object->objectSubId)); nkeys = 3; } else nkeys = 2; - scan = systable_beginscan(depRel, DependReferenceIndex, true, + scan = systable_beginscan(depRel, DependDependerIndexId, true, SnapshotNow, nkeys, key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { - Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); - - otherObject.classId = foundDep->classid; - otherObject.objectId = foundDep->objid; - otherObject.objectSubId = foundDep->objsubid; - - switch (foundDep->deptype) - { - case DEPENDENCY_NORMAL: - - /* - * Perhaps there was another dependency path that would - * have allowed silent deletion of the otherObject, had we - * only taken that path first. In that case, act like this - * link is AUTO, too. - */ - if (object_address_present(&otherObject, oktodelete)) - ereport(DEBUG2, - (errmsg("drop auto-cascades to %s", - getObjectDescription(&otherObject)))); - else if (behavior == DROP_RESTRICT) - { - ereport(msglevel, - (errmsg("%s depends on %s", - getObjectDescription(&otherObject), - objDescription))); - ok = false; - } - else - ereport(msglevel, - (errmsg("drop cascades to %s", - getObjectDescription(&otherObject)))); + simple_heap_delete(depRel, &tup->t_self); + } - if (!recursiveDeletion(&otherObject, behavior, msglevel, - object, oktodelete, depRel)) - ok = false; - break; - case DEPENDENCY_AUTO: - case DEPENDENCY_INTERNAL: + systable_endscan(scan); - /* - * We propagate the DROP without complaint even in the - * RESTRICT case. (However, normal dependencies on the - * component object could still cause failure.) - */ - ereport(DEBUG2, - (errmsg("drop auto-cascades to %s", - getObjectDescription(&otherObject)))); + /* + * Now delete the object itself, in an object-type-dependent way. + */ + doDeletion(object); - if (!recursiveDeletion(&otherObject, behavior, msglevel, - object, oktodelete, depRel)) - ok = false; - break; - case DEPENDENCY_PIN: + /* + * Delete any comments associated with this object. (This is a convenient + * place to do it instead of having every object type know to do it.) + */ + DeleteComments(object->objectId, object->classId, object->objectSubId); - /* - * For a PIN dependency we just ereport immediately; there - * won't be any others to report. - */ - ereport(ERROR, - (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), - errmsg("cannot drop %s because it is required by the database system", - objDescription))); - break; - default: - elog(ERROR, "unrecognized dependency type '%c' for %s", - foundDep->deptype, objDescription); - break; - } - } + /* + * Delete shared dependency references related to this object. Sub-objects + * (columns) don't have dependencies on global objects, so skip them. + */ + if (object->objectSubId == 0) + deleteSharedDependencyRecordsFor(object->classId, object->objectId); - systable_endscan(scan); + /* + * CommandCounterIncrement here to ensure that preceding changes are all + * visible to the next deletion step. + */ + CommandCounterIncrement(); - return ok; + /* + * And we're done! + */ } - /* * doDeletion: actually delete a single object */ @@ -806,6 +1065,18 @@ doDeletion(const ObjectAddress *object) RemoveOpClassById(object->objectId); break; + case OCLASS_OPFAMILY: + RemoveOpFamilyById(object->objectId); + break; + + case OCLASS_AMOP: + RemoveAmOpEntryById(object->objectId); + break; + + case OCLASS_AMPROC: + RemoveAmProcEntryById(object->objectId); + break; + case OCLASS_REWRITE: RemoveRewriteRuleById(object->objectId); break; @@ -818,12 +1089,62 @@ doDeletion(const ObjectAddress *object) RemoveSchemaById(object->objectId); break; + case OCLASS_TSPARSER: + RemoveTSParserById(object->objectId); + break; + + case OCLASS_TSDICT: + RemoveTSDictionaryById(object->objectId); + break; + + case OCLASS_TSTEMPLATE: + RemoveTSTemplateById(object->objectId); + break; + + case OCLASS_TSCONFIG: + RemoveTSConfigurationById(object->objectId); + break; + + /* OCLASS_ROLE, OCLASS_DATABASE, OCLASS_TBLSPACE not handled */ + default: elog(ERROR, "unrecognized object class: %u", object->classId); } } +/* + * AcquireDeletionLock - acquire a suitable lock for deleting an object + * + * We use LockRelation for relations, LockDatabaseObject for everything + * else. Note that dependency.c is not concerned with deleting any kind of + * shared-across-databases object, so we have no need for LockSharedObject. + */ +static void +AcquireDeletionLock(const ObjectAddress *object) +{ + if (object->classId == RelationRelationId) + LockRelationOid(object->objectId, AccessExclusiveLock); + else + /* assume we should lock the whole object not a sub-object */ + LockDatabaseObject(object->classId, object->objectId, 0, + AccessExclusiveLock); +} + +/* + * ReleaseDeletionLock - release an object deletion lock + */ +static void +ReleaseDeletionLock(const ObjectAddress *object) +{ + if (object->classId == RelationRelationId) + UnlockRelationOid(object->objectId, AccessExclusiveLock); + else + /* assume we should lock the whole object not a sub-object */ + UnlockDatabaseObject(object->classId, object->objectId, 0, + AccessExclusiveLock); +} + /* * recordDependencyOnExpr - find expression dependencies * @@ -837,11 +1158,6 @@ doDeletion(const ObjectAddress *object) * * rtable is the rangetable to be used to interpret Vars with varlevelsup=0. * It can be NIL if no such variables are expected. - * - * XXX is it important to create dependencies on the datatypes mentioned in - * the expression? In most cases this would be redundant (eg, a ref to an - * operator indirectly references its input and output datatypes), but I'm - * not quite convinced there are no cases where we need it. */ void recordDependencyOnExpr(const ObjectAddress *depender, @@ -850,23 +1166,23 @@ recordDependencyOnExpr(const ObjectAddress *depender, { find_expr_references_context context; - init_object_addresses(&context.addrs); + context.addrs = new_object_addresses(); /* Set up interpretation for Vars at varlevelsup = 0 */ - context.rtables = makeList1(rtable); + context.rtables = list_make1(rtable); /* Scan the expression tree for referenceable objects */ find_expr_references_walker(expr, &context); /* Remove any duplicates */ - eliminate_duplicate_dependencies(&context.addrs); + eliminate_duplicate_dependencies(context.addrs); /* And record 'em */ recordMultipleDependencies(depender, - context.addrs.refs, context.addrs.numrefs, + context.addrs->refs, context.addrs->numrefs, behavior); - term_object_addresses(&context.addrs); + free_object_addresses(context.addrs); } /* @@ -887,7 +1203,7 @@ recordDependencyOnSingleRelExpr(const ObjectAddress *depender, find_expr_references_context context; RangeTblEntry rte; - init_object_addresses(&context.addrs); + context.addrs = new_object_addresses(); /* We gin up a rather bogus rangetable list to handle Vars */ MemSet(&rte, 0, sizeof(rte)); @@ -895,63 +1211,60 @@ recordDependencyOnSingleRelExpr(const ObjectAddress *depender, rte.rtekind = RTE_RELATION; rte.relid = relId; - context.rtables = makeList1(makeList1(&rte)); + context.rtables = list_make1(list_make1(&rte)); /* Scan the expression tree for referenceable objects */ find_expr_references_walker(expr, &context); /* Remove any duplicates */ - eliminate_duplicate_dependencies(&context.addrs); + eliminate_duplicate_dependencies(context.addrs); /* Separate self-dependencies if necessary */ - if (behavior != self_behavior && context.addrs.numrefs > 0) + if (behavior != self_behavior && context.addrs->numrefs > 0) { - ObjectAddresses self_addrs; + ObjectAddresses *self_addrs; ObjectAddress *outobj; int oldref, outrefs; - init_object_addresses(&self_addrs); + self_addrs = new_object_addresses(); - outobj = context.addrs.refs; + outobj = context.addrs->refs; outrefs = 0; - for (oldref = 0; oldref < context.addrs.numrefs; oldref++) + for (oldref = 0; oldref < context.addrs->numrefs; oldref++) { - ObjectAddress *thisobj = context.addrs.refs + oldref; + ObjectAddress *thisobj = context.addrs->refs + oldref; - if (thisobj->classId == RelOid_pg_class && + if (thisobj->classId == RelationRelationId && thisobj->objectId == relId) { /* Move this ref into self_addrs */ - add_object_address(OCLASS_CLASS, relId, thisobj->objectSubId, - &self_addrs); + add_exact_object_address(thisobj, self_addrs); } else { /* Keep it in context.addrs */ - outobj->classId = thisobj->classId; - outobj->objectId = thisobj->objectId; - outobj->objectSubId = thisobj->objectSubId; + *outobj = *thisobj; outobj++; outrefs++; } } - context.addrs.numrefs = outrefs; + context.addrs->numrefs = outrefs; /* Record the self-dependencies */ recordMultipleDependencies(depender, - self_addrs.refs, self_addrs.numrefs, + self_addrs->refs, self_addrs->numrefs, self_behavior); - term_object_addresses(&self_addrs); + free_object_addresses(self_addrs); } /* Record the external dependencies */ recordMultipleDependencies(depender, - context.addrs.refs, context.addrs.numrefs, + context.addrs->refs, context.addrs->numrefs, behavior); - term_object_addresses(&context.addrs); + free_object_addresses(context.addrs); } /* @@ -962,6 +1275,13 @@ recordDependencyOnSingleRelExpr(const ObjectAddress *depender, * To do so, we do not scan the joinaliasvars list of a join RTE while * scanning the query rangetable, but instead scan each individual entry * of the alias list when we find a reference to it. + * + * Note: in many cases we do not need to create dependencies on the datatypes + * involved in an expression, because we'll have an indirect dependency via + * some other object. For instance Var nodes depend on a column which depends + * on the datatype, and OpExpr nodes depend on the operator which depends on + * the datatype. However we do need a type dependency if there is no such + * indirect dependency, as for example in Const and CoerceToDomain nodes. */ static bool find_expr_references_walker(Node *node, @@ -972,32 +1292,28 @@ find_expr_references_walker(Node *node, if (IsA(node, Var)) { Var *var = (Var *) node; - int levelsup; - List *rtable, - *rtables; + List *rtable; RangeTblEntry *rte; /* Find matching rtable entry, or complain if not found */ - levelsup = var->varlevelsup; - rtables = context->rtables; - while (levelsup--) - { - if (rtables == NIL) - break; - rtables = lnext(rtables); - } - if (rtables == NIL) + if (var->varlevelsup >= list_length(context->rtables)) elog(ERROR, "invalid varlevelsup %d", var->varlevelsup); - rtable = lfirst(rtables); - if (var->varno <= 0 || var->varno > length(rtable)) + rtable = (List *) list_nth(context->rtables, var->varlevelsup); + if (var->varno <= 0 || var->varno > list_length(rtable)) elog(ERROR, "invalid varno %d", var->varno); rte = rt_fetch(var->varno, rtable); + + /* + * A whole-row Var references no specific columns, so adds no new + * dependency. + */ + if (var->varattno == InvalidAttrNumber) + return false; if (rte->rtekind == RTE_RELATION) { /* If it's a plain relation, reference this column */ - /* NB: this code works for whole-row Var with attno 0, too */ add_object_address(OCLASS_CLASS, rte->relid, var->varattno, - &context->addrs); + context->addrs); } else if (rte->rtekind == RTE_JOIN) { @@ -1006,23 +1322,106 @@ find_expr_references_walker(Node *node, /* We must make the context appropriate for join's level */ save_rtables = context->rtables; - context->rtables = rtables; + context->rtables = list_copy_tail(context->rtables, + var->varlevelsup); if (var->varattno <= 0 || - var->varattno > length(rte->joinaliasvars)) + var->varattno > list_length(rte->joinaliasvars)) elog(ERROR, "invalid varattno %d", var->varattno); - find_expr_references_walker((Node *) nth(var->varattno - 1, - rte->joinaliasvars), + find_expr_references_walker((Node *) list_nth(rte->joinaliasvars, + var->varattno - 1), context); + list_free(context->rtables); context->rtables = save_rtables; } return false; } + if (IsA(node, Const)) + { + Const *con = (Const *) node; + Oid objoid; + + /* A constant must depend on the constant's datatype */ + add_object_address(OCLASS_TYPE, con->consttype, 0, + context->addrs); + + /* + * If it's a regclass or similar literal referring to an existing + * object, add a reference to that object. (Currently, only the + * regclass and regconfig cases have any likely use, but we may as + * well handle all the OID-alias datatypes consistently.) + */ + if (!con->constisnull) + { + switch (con->consttype) + { + case REGPROCOID: + case REGPROCEDUREOID: + objoid = DatumGetObjectId(con->constvalue); + if (SearchSysCacheExists(PROCOID, + ObjectIdGetDatum(objoid), + 0, 0, 0)) + add_object_address(OCLASS_PROC, objoid, 0, + context->addrs); + break; + case REGOPEROID: + case REGOPERATOROID: + objoid = DatumGetObjectId(con->constvalue); + if (SearchSysCacheExists(OPEROID, + ObjectIdGetDatum(objoid), + 0, 0, 0)) + add_object_address(OCLASS_OPERATOR, objoid, 0, + context->addrs); + break; + case REGCLASSOID: + objoid = DatumGetObjectId(con->constvalue); + if (SearchSysCacheExists(RELOID, + ObjectIdGetDatum(objoid), + 0, 0, 0)) + add_object_address(OCLASS_CLASS, objoid, 0, + context->addrs); + break; + case REGTYPEOID: + objoid = DatumGetObjectId(con->constvalue); + if (SearchSysCacheExists(TYPEOID, + ObjectIdGetDatum(objoid), + 0, 0, 0)) + add_object_address(OCLASS_TYPE, objoid, 0, + context->addrs); + break; + case REGCONFIGOID: + objoid = DatumGetObjectId(con->constvalue); + if (SearchSysCacheExists(TSCONFIGOID, + ObjectIdGetDatum(objoid), + 0, 0, 0)) + add_object_address(OCLASS_TSCONFIG, objoid, 0, + context->addrs); + break; + case REGDICTIONARYOID: + objoid = DatumGetObjectId(con->constvalue); + if (SearchSysCacheExists(TSDICTOID, + ObjectIdGetDatum(objoid), + 0, 0, 0)) + add_object_address(OCLASS_TSDICT, objoid, 0, + context->addrs); + break; + } + } + return false; + } + if (IsA(node, Param)) + { + Param *param = (Param *) node; + + /* A parameter must depend on the parameter's datatype */ + add_object_address(OCLASS_TYPE, param->paramtype, 0, + context->addrs); + } if (IsA(node, FuncExpr)) { FuncExpr *funcexpr = (FuncExpr *) node; add_object_address(OCLASS_PROC, funcexpr->funcid, 0, - &context->addrs); + context->addrs); /* fall through to examine arguments */ } if (IsA(node, OpExpr)) @@ -1030,7 +1429,7 @@ find_expr_references_walker(Node *node, OpExpr *opexpr = (OpExpr *) node; add_object_address(OCLASS_OPERATOR, opexpr->opno, 0, - &context->addrs); + context->addrs); /* fall through to examine arguments */ } if (IsA(node, DistinctExpr)) @@ -1038,7 +1437,7 @@ find_expr_references_walker(Node *node, DistinctExpr *distinctexpr = (DistinctExpr *) node; add_object_address(OCLASS_OPERATOR, distinctexpr->opno, 0, - &context->addrs); + context->addrs); /* fall through to examine arguments */ } if (IsA(node, ScalarArrayOpExpr)) @@ -1046,7 +1445,7 @@ find_expr_references_walker(Node *node, ScalarArrayOpExpr *opexpr = (ScalarArrayOpExpr *) node; add_object_address(OCLASS_OPERATOR, opexpr->opno, 0, - &context->addrs); + context->addrs); /* fall through to examine arguments */ } if (IsA(node, NullIfExpr)) @@ -1054,7 +1453,7 @@ find_expr_references_walker(Node *node, NullIfExpr *nullifexpr = (NullIfExpr *) node; add_object_address(OCLASS_OPERATOR, nullifexpr->opno, 0, - &context->addrs); + context->addrs); /* fall through to examine arguments */ } if (IsA(node, Aggref)) @@ -1062,47 +1461,115 @@ find_expr_references_walker(Node *node, Aggref *aggref = (Aggref *) node; add_object_address(OCLASS_PROC, aggref->aggfnoid, 0, - &context->addrs); + context->addrs); /* fall through to examine arguments */ } - if (IsA(node, SubLink)) + if (is_subplan(node)) { - SubLink *sublink = (SubLink *) node; - List *opid; + /* Extra work needed here if we ever need this case */ + elog(ERROR, "already-planned subqueries not supported"); + } + if (IsA(node, RelabelType)) + { + RelabelType *relab = (RelabelType *) node; + + /* since there is no function dependency, need to depend on type */ + add_object_address(OCLASS_TYPE, relab->resulttype, 0, + context->addrs); + } + if (IsA(node, CoerceViaIO)) + { + CoerceViaIO *iocoerce = (CoerceViaIO *) node; + + /* since there is no exposed function, need to depend on type */ + add_object_address(OCLASS_TYPE, iocoerce->resulttype, 0, + context->addrs); + } + if (IsA(node, ArrayCoerceExpr)) + { + ArrayCoerceExpr *acoerce = (ArrayCoerceExpr *) node; + + if (OidIsValid(acoerce->elemfuncid)) + add_object_address(OCLASS_PROC, acoerce->elemfuncid, 0, + context->addrs); + add_object_address(OCLASS_TYPE, acoerce->resulttype, 0, + context->addrs); + /* fall through to examine arguments */ + } + if (IsA(node, ConvertRowtypeExpr)) + { + ConvertRowtypeExpr *cvt = (ConvertRowtypeExpr *) node; + + /* since there is no function dependency, need to depend on type */ + add_object_address(OCLASS_TYPE, cvt->resulttype, 0, + context->addrs); + } + if (IsA(node, RowExpr)) + { + RowExpr *rowexpr = (RowExpr *) node; + + add_object_address(OCLASS_TYPE, rowexpr->row_typeid, 0, + context->addrs); + } + if (IsA(node, RowCompareExpr)) + { + RowCompareExpr *rcexpr = (RowCompareExpr *) node; + ListCell *l; - foreach(opid, sublink->operOids) + foreach(l, rcexpr->opnos) { - add_object_address(OCLASS_OPERATOR, lfirsto(opid), 0, - &context->addrs); + add_object_address(OCLASS_OPERATOR, lfirst_oid(l), 0, + context->addrs); + } + foreach(l, rcexpr->opfamilies) + { + add_object_address(OCLASS_OPFAMILY, lfirst_oid(l), 0, + context->addrs); } /* fall through to examine arguments */ } - if (is_subplan(node)) + if (IsA(node, CoerceToDomain)) { - /* Extra work needed here if we ever need this case */ - elog(ERROR, "already-planned subqueries not supported"); + CoerceToDomain *cd = (CoerceToDomain *) node; + + add_object_address(OCLASS_TYPE, cd->resulttype, 0, + context->addrs); } if (IsA(node, Query)) { /* Recurse into RTE subquery or not-yet-planned sublink subquery */ Query *query = (Query *) node; - List *rtable; + ListCell *rtable; bool result; /* - * Add whole-relation refs for each plain relation mentioned in - * the subquery's rtable. (Note: query_tree_walker takes care of - * recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need - * to do that here. But keep it from looking at join alias - * lists.) + * Add whole-relation refs for each plain relation mentioned in the + * subquery's rtable, as well as datatype refs for any datatypes used + * as a RECORD function's output. (Note: query_tree_walker takes care + * of recursing into RTE_FUNCTION and RTE_SUBQUERY RTEs, so no need to + * do that here. But keep it from looking at join alias lists.) */ foreach(rtable, query->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable); + ListCell *ct; - if (rte->rtekind == RTE_RELATION) - add_object_address(OCLASS_CLASS, rte->relid, 0, - &context->addrs); + switch (rte->rtekind) + { + case RTE_RELATION: + add_object_address(OCLASS_CLASS, rte->relid, 0, + context->addrs); + break; + case RTE_FUNCTION: + foreach(ct, rte->funccoltypes) + { + add_object_address(OCLASS_TYPE, lfirst_oid(ct), 0, + context->addrs); + } + break; + default: + break; + } } /* Examine substructure of query */ @@ -1111,7 +1578,7 @@ find_expr_references_walker(Node *node, find_expr_references_walker, (void *) context, QTW_IGNORE_JOINALIASES); - context->rtables = lnext(context->rtables); + context->rtables = list_delete_first(context->rtables); return result; } return expression_tree_walker(node, find_expr_references_walker, @@ -1128,6 +1595,13 @@ eliminate_duplicate_dependencies(ObjectAddresses *addrs) int oldref, newrefs; + /* + * We can't sort if the array has "extra" data, because there's no way + * to keep it in sync. Fortunately that combination of features is + * not needed. + */ + Assert(!addrs->extras); + if (addrs->numrefs <= 1) return; /* nothing to do */ @@ -1149,11 +1623,11 @@ eliminate_duplicate_dependencies(ObjectAddresses *addrs) continue; /* identical, so drop thisobj */ /* - * If we have a whole-object reference and a reference to a - * part of the same object, we don't need the whole-object - * reference (for example, we don't need to reference both - * table foo and column foo.bar). The whole-object reference - * will always appear first in the sorted list. + * If we have a whole-object reference and a reference to a part + * of the same object, we don't need the whole-object reference + * (for example, we don't need to reference both table foo and + * column foo.bar). The whole-object reference will always appear + * first in the sorted list. */ if (priorobj->objectSubId == 0) { @@ -1164,9 +1638,7 @@ eliminate_duplicate_dependencies(ObjectAddresses *addrs) } /* Not identical, so add thisobj to output set */ priorobj++; - priorobj->classId = thisobj->classId; - priorobj->objectId = thisobj->objectId; - priorobj->objectSubId = thisobj->objectSubId; + *priorobj = *thisobj; newrefs++; } @@ -1205,32 +1677,60 @@ object_address_comparator(const void *a, const void *b) /* * Routines for handling an expansible array of ObjectAddress items. * - * init_object_addresses: initialize an ObjectAddresses array. + * new_object_addresses: create a new ObjectAddresses array. */ -static void -init_object_addresses(ObjectAddresses *addrs) +ObjectAddresses * +new_object_addresses(void) { - /* Initialize array to empty */ + ObjectAddresses *addrs; + + addrs = palloc(sizeof(ObjectAddresses)); + addrs->numrefs = 0; - addrs->maxrefs = 32; /* arbitrary initial array size */ + addrs->maxrefs = 32; addrs->refs = (ObjectAddress *) palloc(addrs->maxrefs * sizeof(ObjectAddress)); + addrs->extras = NULL; /* until/unless needed */ + + return addrs; +} + +/* + * Add an entry to an ObjectAddresses array. + * + * It is convenient to specify the class by ObjectClass rather than directly + * by catalog OID. + */ +static void +add_object_address(ObjectClass oclass, Oid objectId, int32 subId, + ObjectAddresses *addrs) +{ + ObjectAddress *item; - /* Initialize object_classes[] if not done yet */ - /* This will be needed by add_object_address() */ - if (!object_classes_initialized) - init_object_classes(); + /* enlarge array if needed */ + if (addrs->numrefs >= addrs->maxrefs) + { + addrs->maxrefs *= 2; + addrs->refs = (ObjectAddress *) + repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress)); + Assert(!addrs->extras); + } + /* record this item */ + item = addrs->refs + addrs->numrefs; + item->classId = object_classes[oclass]; + item->objectId = objectId; + item->objectSubId = subId; + addrs->numrefs++; } /* * Add an entry to an ObjectAddresses array. * - * It is convenient to specify the class by ObjectClass rather than directly - * by catalog OID. + * As above, but specify entry exactly. */ -static void -add_object_address(ObjectClasses oclass, Oid objectId, int32 subId, - ObjectAddresses *addrs) +void +add_exact_object_address(const ObjectAddress *object, + ObjectAddresses *addrs) { ObjectAddress *item; @@ -1240,25 +1740,31 @@ add_object_address(ObjectClasses oclass, Oid objectId, int32 subId, addrs->maxrefs *= 2; addrs->refs = (ObjectAddress *) repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress)); + Assert(!addrs->extras); } /* record this item */ item = addrs->refs + addrs->numrefs; - item->classId = object_classes[oclass]; - item->objectId = objectId; - item->objectSubId = subId; + *item = *object; addrs->numrefs++; } /* * Add an entry to an ObjectAddresses array. * - * As above, but specify entry exactly. + * As above, but specify entry exactly and provide some "extra" data too. */ static void -add_exact_object_address(const ObjectAddress *object, - ObjectAddresses *addrs) +add_exact_object_address_extra(const ObjectAddress *object, + const ObjectAddressExtra *extra, + ObjectAddresses *addrs) { ObjectAddress *item; + ObjectAddressExtra *itemextra; + + /* allocate extra space if first time */ + if (!addrs->extras) + addrs->extras = (ObjectAddressExtra *) + palloc(addrs->maxrefs * sizeof(ObjectAddressExtra)); /* enlarge array if needed */ if (addrs->numrefs >= addrs->maxrefs) @@ -1266,10 +1772,14 @@ add_exact_object_address(const ObjectAddress *object, addrs->maxrefs *= 2; addrs->refs = (ObjectAddress *) repalloc(addrs->refs, addrs->maxrefs * sizeof(ObjectAddress)); + addrs->extras = (ObjectAddressExtra *) + repalloc(addrs->extras, addrs->maxrefs * sizeof(ObjectAddressExtra)); } /* record this item */ item = addrs->refs + addrs->numrefs; *item = *object; + itemextra = addrs->extras + addrs->numrefs; + *itemextra = *extra; addrs->numrefs++; } @@ -1278,15 +1788,15 @@ add_exact_object_address(const ObjectAddress *object, * * We return "true" if object is a subobject of something in the array, too. */ -static bool +bool object_address_present(const ObjectAddress *object, - ObjectAddresses *addrs) + const ObjectAddresses *addrs) { int i; for (i = addrs->numrefs - 1; i >= 0; i--) { - ObjectAddress *thisobj = addrs->refs + i; + const ObjectAddress *thisobj = addrs->refs + i; if (object->classId == thisobj->classId && object->objectId == thisobj->objectId) @@ -1301,122 +1811,178 @@ object_address_present(const ObjectAddress *object, } /* - * Clean up when done with an ObjectAddresses array. + * As above, except that if the object is present then also OR the given + * flags into its associated extra data (which must exist). */ -static void -term_object_addresses(ObjectAddresses *addrs) +static bool +object_address_present_add_flags(const ObjectAddress *object, + int flags, + ObjectAddresses *addrs) { - pfree(addrs->refs); + int i; + + for (i = addrs->numrefs - 1; i >= 0; i--) + { + ObjectAddress *thisobj = addrs->refs + i; + + if (object->classId == thisobj->classId && + object->objectId == thisobj->objectId) + { + if (object->objectSubId == thisobj->objectSubId) + { + ObjectAddressExtra *thisextra = addrs->extras + i; + + thisextra->flags |= flags; + return true; + } + if (thisobj->objectSubId == 0) + { + /* + * We get here if we find a need to delete a column after + * having already decided to drop its whole table. Obviously + * we no longer need to drop the column. But don't plaster + * its flags on the table. + */ + return true; + } + } + } + + return false; } /* - * Initialize the object_classes[] table. - * - * Although some of these OIDs aren't compile-time constants, they surely - * shouldn't change during a backend's run. So, we look them up the - * first time through and then cache them. + * Record multiple dependencies from an ObjectAddresses array, after first + * removing any duplicates. */ -static void -init_object_classes(void) +void +record_object_address_dependencies(const ObjectAddress *depender, + ObjectAddresses *referenced, + DependencyType behavior) +{ + eliminate_duplicate_dependencies(referenced); + recordMultipleDependencies(depender, + referenced->refs, referenced->numrefs, + behavior); +} + +/* + * Clean up when done with an ObjectAddresses array. + */ +void +free_object_addresses(ObjectAddresses *addrs) { - object_classes[OCLASS_CLASS] = RelOid_pg_class; - object_classes[OCLASS_PROC] = RelOid_pg_proc; - object_classes[OCLASS_TYPE] = RelOid_pg_type; - object_classes[OCLASS_CAST] = get_system_catalog_relid(CastRelationName); - object_classes[OCLASS_CONSTRAINT] = get_system_catalog_relid(ConstraintRelationName); - object_classes[OCLASS_CONVERSION] = get_system_catalog_relid(ConversionRelationName); - object_classes[OCLASS_DEFAULT] = get_system_catalog_relid(AttrDefaultRelationName); - object_classes[OCLASS_LANGUAGE] = get_system_catalog_relid(LanguageRelationName); - object_classes[OCLASS_OPERATOR] = get_system_catalog_relid(OperatorRelationName); - object_classes[OCLASS_OPCLASS] = get_system_catalog_relid(OperatorClassRelationName); - object_classes[OCLASS_REWRITE] = get_system_catalog_relid(RewriteRelationName); - object_classes[OCLASS_TRIGGER] = get_system_catalog_relid(TriggerRelationName); - object_classes[OCLASS_SCHEMA] = get_system_catalog_relid(NamespaceRelationName); - object_classes_initialized = true; + pfree(addrs->refs); + if (addrs->extras) + pfree(addrs->extras); + pfree(addrs); } /* * Determine the class of a given object identified by objectAddress. * - * This function is needed just because some of the system catalogs do - * not have hardwired-at-compile-time OIDs. + * This function is essentially the reverse mapping for the object_classes[] + * table. We implement it as a function because the OIDs aren't consecutive. */ -static ObjectClasses +ObjectClass getObjectClass(const ObjectAddress *object) { - /* Easy for the bootstrapped catalogs... */ switch (object->classId) { - case RelOid_pg_class: + case RelationRelationId: /* caller must check objectSubId */ return OCLASS_CLASS; - case RelOid_pg_proc: + case ProcedureRelationId: Assert(object->objectSubId == 0); return OCLASS_PROC; - case RelOid_pg_type: + case TypeRelationId: Assert(object->objectSubId == 0); return OCLASS_TYPE; - } - /* - * Handle cases where catalog's OID is not hardwired. - */ - if (!object_classes_initialized) - init_object_classes(); + case CastRelationId: + Assert(object->objectSubId == 0); + return OCLASS_CAST; - if (object->classId == object_classes[OCLASS_CAST]) - { - Assert(object->objectSubId == 0); - return OCLASS_CAST; - } - if (object->classId == object_classes[OCLASS_CONSTRAINT]) - { - Assert(object->objectSubId == 0); - return OCLASS_CONSTRAINT; - } - if (object->classId == object_classes[OCLASS_CONVERSION]) - { - Assert(object->objectSubId == 0); - return OCLASS_CONVERSION; - } - if (object->classId == object_classes[OCLASS_DEFAULT]) - { - Assert(object->objectSubId == 0); - return OCLASS_DEFAULT; - } - if (object->classId == object_classes[OCLASS_LANGUAGE]) - { - Assert(object->objectSubId == 0); - return OCLASS_LANGUAGE; - } - if (object->classId == object_classes[OCLASS_OPERATOR]) - { - Assert(object->objectSubId == 0); - return OCLASS_OPERATOR; - } - if (object->classId == object_classes[OCLASS_OPCLASS]) - { - Assert(object->objectSubId == 0); - return OCLASS_OPCLASS; - } - if (object->classId == object_classes[OCLASS_REWRITE]) - { - Assert(object->objectSubId == 0); - return OCLASS_REWRITE; - } - if (object->classId == object_classes[OCLASS_TRIGGER]) - { - Assert(object->objectSubId == 0); - return OCLASS_TRIGGER; - } - if (object->classId == object_classes[OCLASS_SCHEMA]) - { - Assert(object->objectSubId == 0); - return OCLASS_SCHEMA; + case ConstraintRelationId: + Assert(object->objectSubId == 0); + return OCLASS_CONSTRAINT; + + case ConversionRelationId: + Assert(object->objectSubId == 0); + return OCLASS_CONVERSION; + + case AttrDefaultRelationId: + Assert(object->objectSubId == 0); + return OCLASS_DEFAULT; + + case LanguageRelationId: + Assert(object->objectSubId == 0); + return OCLASS_LANGUAGE; + + case OperatorRelationId: + Assert(object->objectSubId == 0); + return OCLASS_OPERATOR; + + case OperatorClassRelationId: + Assert(object->objectSubId == 0); + return OCLASS_OPCLASS; + + case OperatorFamilyRelationId: + Assert(object->objectSubId == 0); + return OCLASS_OPFAMILY; + + case AccessMethodOperatorRelationId: + Assert(object->objectSubId == 0); + return OCLASS_AMOP; + + case AccessMethodProcedureRelationId: + Assert(object->objectSubId == 0); + return OCLASS_AMPROC; + + case RewriteRelationId: + Assert(object->objectSubId == 0); + return OCLASS_REWRITE; + + case TriggerRelationId: + Assert(object->objectSubId == 0); + return OCLASS_TRIGGER; + + case NamespaceRelationId: + Assert(object->objectSubId == 0); + return OCLASS_SCHEMA; + + case TSParserRelationId: + Assert(object->objectSubId == 0); + return OCLASS_TSPARSER; + + case TSDictionaryRelationId: + Assert(object->objectSubId == 0); + return OCLASS_TSDICT; + + case TSTemplateRelationId: + Assert(object->objectSubId == 0); + return OCLASS_TSTEMPLATE; + + case TSConfigRelationId: + Assert(object->objectSubId == 0); + return OCLASS_TSCONFIG; + + case AuthIdRelationId: + Assert(object->objectSubId == 0); + return OCLASS_ROLE; + + case DatabaseRelationId: + Assert(object->objectSubId == 0); + return OCLASS_DATABASE; + + case TableSpaceRelationId: + Assert(object->objectSubId == 0); + return OCLASS_TBLSPACE; } + /* shouldn't get here */ elog(ERROR, "unrecognized object class: %u", object->classId); return OCLASS_CLASS; /* keep compiler quiet */ } @@ -1426,7 +1992,7 @@ getObjectClass(const ObjectAddress *object) * * The result is a palloc'd string. */ -static char * +char * getObjectDescription(const ObjectAddress *object) { StringInfoData buffer; @@ -1438,18 +2004,18 @@ getObjectDescription(const ObjectAddress *object) case OCLASS_CLASS: getRelationDescription(&buffer, object->objectId); if (object->objectSubId != 0) - appendStringInfo(&buffer, " column %s", - get_attname(object->objectId, - object->objectSubId)); + appendStringInfo(&buffer, _(" column %s"), + get_relid_attribute_name(object->objectId, + object->objectSubId)); break; case OCLASS_PROC: - appendStringInfo(&buffer, "function %s", + appendStringInfo(&buffer, _("function %s"), format_procedure(object->objectId)); break; case OCLASS_TYPE: - appendStringInfo(&buffer, "type %s", + appendStringInfo(&buffer, _("type %s"), format_type_be(object->objectId)); break; @@ -1461,13 +2027,14 @@ getObjectDescription(const ObjectAddress *object) HeapTuple tup; Form_pg_cast castForm; - castDesc = heap_openr(CastRelationName, AccessShareLock); + castDesc = heap_open(CastRelationId, AccessShareLock); - ScanKeyEntryInitialize(&skey[0], 0x0, - ObjectIdAttributeNumber, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); - rcscan = systable_beginscan(castDesc, CastOidIndex, true, + rcscan = systable_beginscan(castDesc, CastOidIndexId, true, SnapshotNow, 1, skey); tup = systable_getnext(rcscan); @@ -1478,7 +2045,7 @@ getObjectDescription(const ObjectAddress *object) castForm = (Form_pg_cast) GETSTRUCT(tup); - appendStringInfo(&buffer, "cast from %s to %s", + appendStringInfo(&buffer, _("cast from %s to %s"), format_type_be(castForm->castsource), format_type_be(castForm->casttarget)); @@ -1489,43 +2056,30 @@ getObjectDescription(const ObjectAddress *object) case OCLASS_CONSTRAINT: { - Relation conDesc; - ScanKeyData skey[1]; - SysScanDesc rcscan; - HeapTuple tup; + HeapTuple conTup; Form_pg_constraint con; - conDesc = heap_openr(ConstraintRelationName, AccessShareLock); - - ScanKeyEntryInitialize(&skey[0], 0x0, - ObjectIdAttributeNumber, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); - - rcscan = systable_beginscan(conDesc, ConstraintOidIndex, true, - SnapshotNow, 1, skey); - - tup = systable_getnext(rcscan); - - if (!HeapTupleIsValid(tup)) - elog(ERROR, "could not find tuple for constraint %u", + conTup = SearchSysCache(CONSTROID, + ObjectIdGetDatum(object->objectId), + 0, 0, 0); + if (!HeapTupleIsValid(conTup)) + elog(ERROR, "cache lookup failed for constraint %u", object->objectId); - - con = (Form_pg_constraint) GETSTRUCT(tup); + con = (Form_pg_constraint) GETSTRUCT(conTup); if (OidIsValid(con->conrelid)) { - appendStringInfo(&buffer, "constraint %s on ", + appendStringInfo(&buffer, _("constraint %s on "), NameStr(con->conname)); getRelationDescription(&buffer, con->conrelid); } else { - appendStringInfo(&buffer, "constraint %s", + appendStringInfo(&buffer, _("constraint %s"), NameStr(con->conname)); } - systable_endscan(rcscan); - heap_close(conDesc, AccessShareLock); + ReleaseSysCache(conTup); break; } @@ -1533,14 +2087,14 @@ getObjectDescription(const ObjectAddress *object) { HeapTuple conTup; - conTup = SearchSysCache(CONOID, - ObjectIdGetDatum(object->objectId), + conTup = SearchSysCache(CONVOID, + ObjectIdGetDatum(object->objectId), 0, 0, 0); if (!HeapTupleIsValid(conTup)) elog(ERROR, "cache lookup failed for conversion %u", object->objectId); - appendStringInfo(&buffer, "conversion %s", - NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname)); + appendStringInfo(&buffer, _("conversion %s"), + NameStr(((Form_pg_conversion) GETSTRUCT(conTup))->conname)); ReleaseSysCache(conTup); break; } @@ -1554,14 +2108,15 @@ getObjectDescription(const ObjectAddress *object) Form_pg_attrdef attrdef; ObjectAddress colobject; - attrdefDesc = heap_openr(AttrDefaultRelationName, AccessShareLock); + attrdefDesc = heap_open(AttrDefaultRelationId, AccessShareLock); - ScanKeyEntryInitialize(&skey[0], 0x0, - ObjectIdAttributeNumber, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); - adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndex, true, - SnapshotNow, 1, skey); + adscan = systable_beginscan(attrdefDesc, AttrDefaultOidIndexId, + true, SnapshotNow, 1, skey); tup = systable_getnext(adscan); @@ -1571,11 +2126,11 @@ getObjectDescription(const ObjectAddress *object) attrdef = (Form_pg_attrdef) GETSTRUCT(tup); - colobject.classId = RelOid_pg_class; + colobject.classId = RelationRelationId; colobject.objectId = attrdef->adrelid; colobject.objectSubId = attrdef->adnum; - appendStringInfo(&buffer, "default for %s", + appendStringInfo(&buffer, _("default for %s"), getObjectDescription(&colobject)); systable_endscan(adscan); @@ -1588,19 +2143,19 @@ getObjectDescription(const ObjectAddress *object) HeapTuple langTup; langTup = SearchSysCache(LANGOID, - ObjectIdGetDatum(object->objectId), + ObjectIdGetDatum(object->objectId), 0, 0, 0); if (!HeapTupleIsValid(langTup)) elog(ERROR, "cache lookup failed for language %u", object->objectId); - appendStringInfo(&buffer, "language %s", - NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname)); + appendStringInfo(&buffer, _("language %s"), + NameStr(((Form_pg_language) GETSTRUCT(langTup))->lanname)); ReleaseSysCache(langTup); break; } case OCLASS_OPERATOR: - appendStringInfo(&buffer, "operator %s", + appendStringInfo(&buffer, _("operator %s"), format_operator(object->objectId)); break; @@ -1613,32 +2168,30 @@ getObjectDescription(const ObjectAddress *object) char *nspname; opcTup = SearchSysCache(CLAOID, - ObjectIdGetDatum(object->objectId), + ObjectIdGetDatum(object->objectId), 0, 0, 0); if (!HeapTupleIsValid(opcTup)) elog(ERROR, "cache lookup failed for opclass %u", object->objectId); opcForm = (Form_pg_opclass) GETSTRUCT(opcTup); + amTup = SearchSysCache(AMOID, + ObjectIdGetDatum(opcForm->opcmethod), + 0, 0, 0); + if (!HeapTupleIsValid(amTup)) + elog(ERROR, "cache lookup failed for access method %u", + opcForm->opcmethod); + amForm = (Form_pg_am) GETSTRUCT(amTup); + /* Qualify the name if not visible in search path */ if (OpclassIsVisible(object->objectId)) nspname = NULL; else nspname = get_namespace_name(opcForm->opcnamespace); - appendStringInfo(&buffer, "operator class %s", + appendStringInfo(&buffer, _("operator class %s for access method %s"), quote_qualified_identifier(nspname, - NameStr(opcForm->opcname))); - - amTup = SearchSysCache(AMOID, - ObjectIdGetDatum(opcForm->opcamid), - 0, 0, 0); - if (!HeapTupleIsValid(amTup)) - elog(ERROR, "cache lookup failed for access method %u", - opcForm->opcamid); - amForm = (Form_pg_am) GETSTRUCT(amTup); - - appendStringInfo(&buffer, " for %s", + NameStr(opcForm->opcname)), NameStr(amForm->amname)); ReleaseSysCache(amTup); @@ -1646,6 +2199,102 @@ getObjectDescription(const ObjectAddress *object) break; } + case OCLASS_OPFAMILY: + getOpFamilyDescription(&buffer, object->objectId); + break; + + case OCLASS_AMOP: + { + Relation amopDesc; + ScanKeyData skey[1]; + SysScanDesc amscan; + HeapTuple tup; + Form_pg_amop amopForm; + StringInfoData opfam; + + amopDesc = heap_open(AccessMethodOperatorRelationId, + AccessShareLock); + + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); + + amscan = systable_beginscan(amopDesc, AccessMethodOperatorOidIndexId, true, + SnapshotNow, 1, skey); + + tup = systable_getnext(amscan); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "could not find tuple for amop entry %u", + object->objectId); + + amopForm = (Form_pg_amop) GETSTRUCT(tup); + + initStringInfo(&opfam); + getOpFamilyDescription(&opfam, amopForm->amopfamily); + /* + * translator: %d is the operator strategy (a number), the + * first %s is the textual form of the operator, and the second + * %s is the description of the operator family. + */ + appendStringInfo(&buffer, _("operator %d %s of %s"), + amopForm->amopstrategy, + format_operator(amopForm->amopopr), + opfam.data); + pfree(opfam.data); + + systable_endscan(amscan); + heap_close(amopDesc, AccessShareLock); + break; + } + + case OCLASS_AMPROC: + { + Relation amprocDesc; + ScanKeyData skey[1]; + SysScanDesc amscan; + HeapTuple tup; + Form_pg_amproc amprocForm; + StringInfoData opfam; + + amprocDesc = heap_open(AccessMethodProcedureRelationId, + AccessShareLock); + + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); + + amscan = systable_beginscan(amprocDesc, AccessMethodProcedureOidIndexId, true, + SnapshotNow, 1, skey); + + tup = systable_getnext(amscan); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "could not find tuple for amproc entry %u", + object->objectId); + + amprocForm = (Form_pg_amproc) GETSTRUCT(tup); + + initStringInfo(&opfam); + getOpFamilyDescription(&opfam, amprocForm->amprocfamily); + /* + * translator: %d is the function number, the first %s is the + * textual form of the function with arguments, and the second + * %s is the description of the operator family. + */ + appendStringInfo(&buffer, _("function %d %s of %s"), + amprocForm->amprocnum, + format_procedure(amprocForm->amproc), + opfam.data); + pfree(opfam.data); + + systable_endscan(amscan); + heap_close(amprocDesc, AccessShareLock); + break; + } + case OCLASS_REWRITE: { Relation ruleDesc; @@ -1654,13 +2303,14 @@ getObjectDescription(const ObjectAddress *object) HeapTuple tup; Form_pg_rewrite rule; - ruleDesc = heap_openr(RewriteRelationName, AccessShareLock); + ruleDesc = heap_open(RewriteRelationId, AccessShareLock); - ScanKeyEntryInitialize(&skey[0], 0x0, - ObjectIdAttributeNumber, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); - rcscan = systable_beginscan(ruleDesc, RewriteOidIndex, true, + rcscan = systable_beginscan(ruleDesc, RewriteOidIndexId, true, SnapshotNow, 1, skey); tup = systable_getnext(rcscan); @@ -1671,7 +2321,7 @@ getObjectDescription(const ObjectAddress *object) rule = (Form_pg_rewrite) GETSTRUCT(tup); - appendStringInfo(&buffer, "rule %s on ", + appendStringInfo(&buffer, _("rule %s on "), NameStr(rule->rulename)); getRelationDescription(&buffer, rule->ev_class); @@ -1688,13 +2338,14 @@ getObjectDescription(const ObjectAddress *object) HeapTuple tup; Form_pg_trigger trig; - trigDesc = heap_openr(TriggerRelationName, AccessShareLock); + trigDesc = heap_open(TriggerRelationId, AccessShareLock); - ScanKeyEntryInitialize(&skey[0], 0x0, - ObjectIdAttributeNumber, F_OIDEQ, - ObjectIdGetDatum(object->objectId)); + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); - tgscan = systable_beginscan(trigDesc, TriggerOidIndex, true, + tgscan = systable_beginscan(trigDesc, TriggerOidIndexId, true, SnapshotNow, 1, skey); tup = systable_getnext(tgscan); @@ -1705,7 +2356,7 @@ getObjectDescription(const ObjectAddress *object) trig = (Form_pg_trigger) GETSTRUCT(tup); - appendStringInfo(&buffer, "trigger %s on ", + appendStringInfo(&buffer, _("trigger %s on "), NameStr(trig->tgname)); getRelationDescription(&buffer, trig->tgrelid); @@ -1722,7 +2373,102 @@ getObjectDescription(const ObjectAddress *object) if (!nspname) elog(ERROR, "cache lookup failed for namespace %u", object->objectId); - appendStringInfo(&buffer, "schema %s", nspname); + appendStringInfo(&buffer, _("schema %s"), nspname); + break; + } + + case OCLASS_TSPARSER: + { + HeapTuple tup; + + tup = SearchSysCache(TSPARSEROID, + ObjectIdGetDatum(object->objectId), + 0, 0, 0); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for text search parser %u", + object->objectId); + appendStringInfo(&buffer, _("text search parser %s"), + NameStr(((Form_pg_ts_parser) GETSTRUCT(tup))->prsname)); + ReleaseSysCache(tup); + break; + } + + case OCLASS_TSDICT: + { + HeapTuple tup; + + tup = SearchSysCache(TSDICTOID, + ObjectIdGetDatum(object->objectId), + 0, 0, 0); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for text search dictionary %u", + object->objectId); + appendStringInfo(&buffer, _("text search dictionary %s"), + NameStr(((Form_pg_ts_dict) GETSTRUCT(tup))->dictname)); + ReleaseSysCache(tup); + break; + } + + case OCLASS_TSTEMPLATE: + { + HeapTuple tup; + + tup = SearchSysCache(TSTEMPLATEOID, + ObjectIdGetDatum(object->objectId), + 0, 0, 0); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for text search template %u", + object->objectId); + appendStringInfo(&buffer, _("text search template %s"), + NameStr(((Form_pg_ts_template) GETSTRUCT(tup))->tmplname)); + ReleaseSysCache(tup); + break; + } + + case OCLASS_TSCONFIG: + { + HeapTuple tup; + + tup = SearchSysCache(TSCONFIGOID, + ObjectIdGetDatum(object->objectId), + 0, 0, 0); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for text search configuration %u", + object->objectId); + appendStringInfo(&buffer, _("text search configuration %s"), + NameStr(((Form_pg_ts_config) GETSTRUCT(tup))->cfgname)); + ReleaseSysCache(tup); + break; + } + + case OCLASS_ROLE: + { + appendStringInfo(&buffer, _("role %s"), + GetUserNameFromId(object->objectId)); + break; + } + + case OCLASS_DATABASE: + { + char *datname; + + datname = get_database_name(object->objectId); + if (!datname) + elog(ERROR, "cache lookup failed for database %u", + object->objectId); + appendStringInfo(&buffer, _("database %s"), datname); + break; + } + + case OCLASS_TBLSPACE: + { + char *tblspace; + + tblspace = get_tablespace_name(object->objectId); + if (!tblspace) + elog(ERROR, "cache lookup failed for tablespace %u", + object->objectId); + appendStringInfo(&buffer, _("tablespace %s"), tblspace); break; } @@ -1766,43 +2512,81 @@ getRelationDescription(StringInfo buffer, Oid relid) switch (relForm->relkind) { case RELKIND_RELATION: - appendStringInfo(buffer, "table %s", + appendStringInfo(buffer, _("table %s"), relname); break; case RELKIND_INDEX: - appendStringInfo(buffer, "index %s", - relname); - break; - case RELKIND_SPECIAL: - appendStringInfo(buffer, "special system relation %s", + appendStringInfo(buffer, _("index %s"), relname); break; case RELKIND_SEQUENCE: - appendStringInfo(buffer, "sequence %s", + appendStringInfo(buffer, _("sequence %s"), relname); break; case RELKIND_UNCATALOGED: - appendStringInfo(buffer, "uncataloged table %s", + appendStringInfo(buffer, _("uncataloged table %s"), relname); break; case RELKIND_TOASTVALUE: - appendStringInfo(buffer, "toast table %s", + appendStringInfo(buffer, _("toast table %s"), relname); break; case RELKIND_VIEW: - appendStringInfo(buffer, "view %s", + appendStringInfo(buffer, _("view %s"), relname); break; case RELKIND_COMPOSITE_TYPE: - appendStringInfo(buffer, "composite type %s", + appendStringInfo(buffer, _("composite type %s"), relname); break; default: /* shouldn't get here */ - appendStringInfo(buffer, "relation %s", + appendStringInfo(buffer, _("relation %s"), relname); break; } ReleaseSysCache(relTup); } + +/* + * subroutine for getObjectDescription: describe an operator family + */ +static void +getOpFamilyDescription(StringInfo buffer, Oid opfid) +{ + HeapTuple opfTup; + Form_pg_opfamily opfForm; + HeapTuple amTup; + Form_pg_am amForm; + char *nspname; + + opfTup = SearchSysCache(OPFAMILYOID, + ObjectIdGetDatum(opfid), + 0, 0, 0); + if (!HeapTupleIsValid(opfTup)) + elog(ERROR, "cache lookup failed for opfamily %u", opfid); + opfForm = (Form_pg_opfamily) GETSTRUCT(opfTup); + + amTup = SearchSysCache(AMOID, + ObjectIdGetDatum(opfForm->opfmethod), + 0, 0, 0); + if (!HeapTupleIsValid(amTup)) + elog(ERROR, "cache lookup failed for access method %u", + opfForm->opfmethod); + amForm = (Form_pg_am) GETSTRUCT(amTup); + + /* Qualify the name if not visible in search path */ + if (OpfamilyIsVisible(opfid)) + nspname = NULL; + else + nspname = get_namespace_name(opfForm->opfnamespace); + + appendStringInfo(buffer, _("operator family %s for access method %s"), + quote_qualified_identifier(nspname, + NameStr(opfForm->opfname)), + NameStr(amForm->amname)); + + ReleaseSysCache(amTup); + ReleaseSysCache(opfTup); +}