X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcommands%2Ftypecmds.c;h=3de29b766dea32293f1fdded61f327940381eab5;hb=59a2111b23f6ceec4c777d68e20c1027d3c57c6f;hp=5069c5759ec60e730a70f98d2145dc78b200e156;hpb=82a4a777d94bec965ab2f1d04b6e6a3f0447b377;p=postgresql diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 5069c5759e..3de29b766d 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -3,7 +3,7 @@ * typecmds.c * Routines for SQL commands that manipulate types (and domains). * - * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * @@ -31,18 +31,24 @@ */ #include "postgres.h" -#include "access/genam.h" -#include "access/heapam.h" +#include "access/htup_details.h" #include "access/xact.h" +#include "catalog/binary_upgrade.h" #include "catalog/catalog.h" -#include "catalog/dependency.h" #include "catalog/heap.h" -#include "catalog/indexing.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_am.h" +#include "catalog/pg_authid.h" #include "catalog/pg_collation.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_constraint_fn.h" #include "catalog/pg_depend.h" #include "catalog/pg_enum.h" +#include "catalog/pg_language.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_proc_fn.h" +#include "catalog/pg_range.h" #include "catalog/pg_type.h" #include "catalog/pg_type_fn.h" #include "commands/defrem.h" @@ -51,21 +57,20 @@ #include "executor/executor.h" #include "miscadmin.h" #include "nodes/makefuncs.h" -#include "optimizer/planner.h" #include "optimizer/var.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_func.h" #include "parser/parse_type.h" -#include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/ruleutils.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" -#include "utils/tqual.h" /* result structure for get_rels_with_domain() */ @@ -77,9 +82,11 @@ typedef struct /* atts[] is of allocated length RelationGetNumberOfAttributes(rel) */ } RelToCheck; -/* Potentially set by contrib/pg_upgrade_support functions */ +/* Potentially set by pg_upgrade_support functions */ Oid binary_upgrade_next_array_pg_type_oid = InvalidOid; +static void makeRangeConstructors(const char *name, Oid namespace, + Oid rangeOid, Oid subtype); static Oid findTypeInputFunction(List *procname, Oid typeOid); static Oid findTypeOutputFunction(List *procname, Oid typeOid); static Oid findTypeReceiveFunction(List *procname, Oid typeOid); @@ -87,21 +94,23 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid); static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodoutFunction(List *procname); static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid); -static void validateDomainConstraint(Oid domainoid, char *ccbin); +static Oid findRangeSubOpclass(List *opcname, Oid subtype); +static Oid findRangeCanonicalFunction(List *procname, Oid typeOid); +static Oid findRangeSubtypeDiffFunction(List *procname, Oid subtype); +static void validateDomainConstraint(Oid domainoid, char *ccbin); static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode); -static void checkDomainOwner(HeapTuple tup); static void checkEnumOwner(HeapTuple tup); static char *domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, int typMod, Constraint *constr, - char *domainName); + char *domainName, ObjectAddress *constrAddr); /* * DefineType * Registers a new base type. */ -void +ObjectAddress DefineType(List *names, List *parameters) { char *typeName; @@ -153,6 +162,7 @@ DefineType(List *names, List *parameters) Oid typoid; Oid resulttype; ListCell *pl; + ObjectAddress address; /* * As of Postgres 8.4, we require superuser privilege to create a base @@ -206,7 +216,8 @@ DefineType(List *names, List *parameters) */ if (!OidIsValid(typoid)) { - typoid = TypeShellMake(typeName, typeNamespace, GetUserId()); + address = TypeShellMake(typeName, typeNamespace, GetUserId()); + typoid = address.objectId; /* Make new shell type visible for modification below */ CommandCounterIncrement(); @@ -215,7 +226,7 @@ DefineType(List *names, List *parameters) * creating the shell type was all we're supposed to do. */ if (parameters == NIL) - return; + return address; } else { @@ -439,14 +450,14 @@ DefineType(List *names, List *parameters) { /* backwards-compatibility hack */ ereport(WARNING, - (errmsg("changing return type of function %s from \"opaque\" to %s", - NameListToString(inputName), typeName))); + (errmsg("changing return type of function %s from \"%s\" to \"%s\"", + NameListToString(inputName), "opaque", typeName))); SetFunctionReturnType(inputOid, typoid); } else ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("type input function %s must return type %s", + errmsg("type input function %s must return type \"%s\"", NameListToString(inputName), typeName))); } resulttype = get_func_rettype(outputOid); @@ -456,15 +467,15 @@ DefineType(List *names, List *parameters) { /* backwards-compatibility hack */ ereport(WARNING, - (errmsg("changing return type of function %s from \"opaque\" to \"cstring\"", - NameListToString(outputName)))); + (errmsg("changing return type of function %s from \"%s\" to \"%s\"", + NameListToString(outputName), "opaque", "cstring"))); SetFunctionReturnType(outputOid, CSTRINGOID); } else ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("type output function %s must return type \"cstring\"", - NameListToString(outputName)))); + errmsg("type output function %s must return type \"%s\"", + NameListToString(outputName), "cstring"))); } if (receiveOid) { @@ -472,7 +483,7 @@ DefineType(List *names, List *parameters) if (resulttype != typoid) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("type receive function %s must return type %s", + errmsg("type receive function %s must return type \"%s\"", NameListToString(receiveName), typeName))); } if (sendOid) @@ -481,8 +492,8 @@ DefineType(List *names, List *parameters) if (resulttype != BYTEAOID) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("type send function %s must return type \"bytea\"", - NameListToString(sendName)))); + errmsg("type send function %s must return type \"%s\"", + NameListToString(sendName), "bytea"))); } /* @@ -501,8 +512,8 @@ DefineType(List *names, List *parameters) analyzeOid = findTypeAnalyzeFunction(analyzeName, typoid); /* - * Check permissions on functions. We choose to require the creator/owner - * of a type to also own the underlying functions. Since creating a type + * Check permissions on functions. We choose to require the creator/owner + * of a type to also own the underlying functions. Since creating a type * is tantamount to granting public execute access on the functions, the * minimum sane check would be for execute-with-grant-option. But we * don't have a way to make the type go away if the grant option is @@ -533,16 +544,62 @@ DefineType(List *names, List *parameters) NameListToString(analyzeName)); #endif + /* + * Print warnings if any of the type's I/O functions are marked volatile. + * There is a general assumption that I/O functions are stable or + * immutable; this allows us for example to mark record_in/record_out + * stable rather than volatile. Ideally we would throw errors not just + * warnings here; but since this check is new as of 9.5, and since the + * volatility marking might be just an error-of-omission and not a true + * indication of how the function behaves, we'll let it pass as a warning + * for now. + */ + if (inputOid && func_volatile(inputOid) == PROVOLATILE_VOLATILE) + ereport(WARNING, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type input function %s should not be volatile", + NameListToString(inputName)))); + if (outputOid && func_volatile(outputOid) == PROVOLATILE_VOLATILE) + ereport(WARNING, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type output function %s should not be volatile", + NameListToString(outputName)))); + if (receiveOid && func_volatile(receiveOid) == PROVOLATILE_VOLATILE) + ereport(WARNING, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type receive function %s should not be volatile", + NameListToString(receiveName)))); + if (sendOid && func_volatile(sendOid) == PROVOLATILE_VOLATILE) + ereport(WARNING, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type send function %s should not be volatile", + NameListToString(sendName)))); + if (typmodinOid && func_volatile(typmodinOid) == PROVOLATILE_VOLATILE) + ereport(WARNING, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type modifier input function %s should not be volatile", + NameListToString(typmodinName)))); + if (typmodoutOid && func_volatile(typmodoutOid) == PROVOLATILE_VOLATILE) + ereport(WARNING, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("type modifier output function %s should not be volatile", + NameListToString(typmodoutName)))); + + /* + * OK, we're done checking, time to make the type. We must assign the + * array type OID ahead of calling TypeCreate, since the base type and + * array type each refer to the other. + */ array_oid = AssignTypeArrayOid(); /* * now have TypeCreate do all the real work. * * Note: the pg_type.oid is stored in user tables as array elements (base - * types) in ArrayType and in composite types in DatumTupleFields. This + * types) in ArrayType and in composite types in DatumTupleFields. This * oid must be preserved by binary upgrades. */ - typoid = + address = TypeCreate(InvalidOid, /* no predetermined type OID */ typeName, /* type name */ typeNamespace, /* namespace */ @@ -574,6 +631,7 @@ DefineType(List *names, List *parameters) 0, /* Array Dimensions of typbasetype */ false, /* Type NOT NULL */ collation); /* type's collation */ + Assert(typoid == address.objectId); /* * Create the array type that goes with it. @@ -600,7 +658,7 @@ DefineType(List *names, List *parameters) F_ARRAY_SEND, /* send procedure */ typmodinOid, /* typmodin procedure */ typmodoutOid, /* typmodout procedure */ - InvalidOid, /* analyze procedure - default */ + F_ARRAY_TYPANALYZE, /* analyze procedure */ typoid, /* element type ID */ true, /* yes this is an array type */ InvalidOid, /* no further array type */ @@ -616,6 +674,8 @@ DefineType(List *names, List *parameters) collation); /* type's collation */ pfree(array_type); + + return address; } /* @@ -643,6 +703,14 @@ RemoveTypeById(Oid typeOid) if (((Form_pg_type) GETSTRUCT(tup))->typtype == TYPTYPE_ENUM) EnumValuesDelete(typeOid); + /* + * If it is a range type, delete the pg_range entry too; we don't bother + * with making a dependency entry for that, so it has to be done "by hand" + * here. + */ + if (((Form_pg_type) GETSTRUCT(tup))->typtype == TYPTYPE_RANGE) + RangeDelete(typeOid); + ReleaseSysCache(tup); heap_close(relation, RowExclusiveLock); @@ -653,7 +721,7 @@ RemoveTypeById(Oid typeOid) * DefineDomain * Registers a new domain. */ -void +ObjectAddress DefineDomain(CreateDomainStmt *stmt) { char *domainName; @@ -683,12 +751,12 @@ DefineDomain(CreateDomainStmt *stmt) List *schema = stmt->constraints; ListCell *listptr; Oid basetypeoid; - Oid domainoid; Oid old_type_oid; Oid domaincoll; Form_pg_type baseType; int32 basetypeMod; Oid baseColl; + ObjectAddress address; /* Convert list of names to a name and namespace */ domainNamespace = QualifiedNameGetCreationNamespace(stmt->domainname, @@ -702,7 +770,7 @@ DefineDomain(CreateDomainStmt *stmt) get_namespace_name(domainNamespace)); /* - * Check for collision with an existing type name. If there is one and + * Check for collision with an existing type name. If there is one and * it's an autogenerated array, we can rename it out of the way. */ old_type_oid = GetSysCacheOid2(TYPENAMENSP, @@ -724,19 +792,25 @@ DefineDomain(CreateDomainStmt *stmt) basetypeoid = HeapTupleGetOid(typeTup); /* - * Base type must be a plain base type, another domain or an enum. Domains - * over pseudotypes would create a security hole. Domains over composite - * types might be made to work in the future, but not today. + * Base type must be a plain base type, another domain, an enum or a range + * type. Domains over pseudotypes would create a security hole. Domains + * over composite types might be made to work in the future, but not + * today. */ typtype = baseType->typtype; if (typtype != TYPTYPE_BASE && typtype != TYPTYPE_DOMAIN && - typtype != TYPTYPE_ENUM) + typtype != TYPTYPE_ENUM && + typtype != TYPTYPE_RANGE) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("\"%s\" is not a valid base type for a domain", TypeNameToString(stmt->typeName)))); + aclresult = pg_type_aclcheck(basetypeoid, GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error_type(aclresult, basetypeoid); + /* * Identify the collation if any */ @@ -862,9 +936,7 @@ DefineDomain(CreateDomainStmt *stmt) */ defaultValue = deparse_expression(defaultExpr, - deparse_context_for(domainName, - InvalidOid), - false, false); + NIL, false, false); defaultValueBin = nodeToString(defaultExpr); } } @@ -898,8 +970,14 @@ DefineDomain(CreateDomainStmt *stmt) /* * Check constraints are handled after domain creation, as - * they require the Oid of the domain + * they require the Oid of the domain; at this point we can + * only check that they're not marked NO INHERIT, because that + * would be bogus. */ + if (constr->is_no_inherit) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("check constraints for domains cannot be marked NO INHERIT"))); break; /* @@ -948,7 +1026,7 @@ DefineDomain(CreateDomainStmt *stmt) /* * Have TypeCreate do all the real work. */ - domainoid = + address = TypeCreate(InvalidOid, /* no predetermined type OID */ domainName, /* type name */ domainNamespace, /* namespace */ @@ -993,9 +1071,9 @@ DefineDomain(CreateDomainStmt *stmt) switch (constr->contype) { case CONSTR_CHECK: - domainAddConstraint(domainoid, domainNamespace, + domainAddConstraint(address.objectId, domainNamespace, basetypeoid, basetypeMod, - constr, domainName); + constr, domainName, NULL); break; /* Other constraint types were fully processed above */ @@ -1012,6 +1090,8 @@ DefineDomain(CreateDomainStmt *stmt) * Now we can clean up. */ ReleaseSysCache(typeTup); + + return address; } @@ -1019,16 +1099,16 @@ DefineDomain(CreateDomainStmt *stmt) * DefineEnum * Registers a new enum. */ -void +ObjectAddress DefineEnum(CreateEnumStmt *stmt) { char *enumName; char *enumArrayName; Oid enumNamespace; - Oid enumTypeOid; AclResult aclresult; Oid old_type_oid; Oid enumArrayOid; + ObjectAddress enumTypeAddr; /* Convert list of names to a name and namespace */ enumNamespace = QualifiedNameGetCreationNamespace(stmt->typeName, @@ -1041,7 +1121,7 @@ DefineEnum(CreateEnumStmt *stmt) get_namespace_name(enumNamespace)); /* - * Check for collision with an existing type name. If there is one and + * Check for collision with an existing type name. If there is one and * it's an autogenerated array, we can rename it out of the way. */ old_type_oid = GetSysCacheOid2(TYPENAMENSP, @@ -1058,7 +1138,7 @@ DefineEnum(CreateEnumStmt *stmt) enumArrayOid = AssignTypeArrayOid(); /* Create the pg_type entry */ - enumTypeOid = + enumTypeAddr = TypeCreate(InvalidOid, /* no predetermined type OID */ enumName, /* type name */ enumNamespace, /* namespace */ @@ -1092,7 +1172,7 @@ DefineEnum(CreateEnumStmt *stmt) InvalidOid); /* type's collation */ /* Enter the enum's values into pg_enum */ - EnumValuesCreate(enumTypeOid, stmt->vals); + EnumValuesCreate(enumTypeAddr.objectId, stmt->vals); /* * Create the array type that goes with it. @@ -1116,8 +1196,8 @@ DefineEnum(CreateEnumStmt *stmt) F_ARRAY_SEND, /* send procedure */ InvalidOid, /* typmodin procedure - none */ InvalidOid, /* typmodout procedure - none */ - InvalidOid, /* analyze procedure - default */ - enumTypeOid, /* element type ID */ + F_ARRAY_TYPANALYZE, /* analyze procedure */ + enumTypeAddr.objectId, /* element type ID */ true, /* yes this is an array type */ InvalidOid, /* no further array type */ InvalidOid, /* base type ID */ @@ -1132,18 +1212,21 @@ DefineEnum(CreateEnumStmt *stmt) InvalidOid); /* type's collation */ pfree(enumArrayName); + + return enumTypeAddr; } /* * AlterEnum * Adds a new label to an existing enum. */ -void -AlterEnum(AlterEnumStmt *stmt) +ObjectAddress +AlterEnum(AlterEnumStmt *stmt, bool isTopLevel) { Oid enum_type_oid; TypeName *typename; HeapTuple tup; + ObjectAddress address; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(stmt->typeName); @@ -1153,14 +1236,40 @@ AlterEnum(AlterEnumStmt *stmt) if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", enum_type_oid); + /* + * Ordinarily we disallow adding values within transaction blocks, because + * we can't cope with enum OID values getting into indexes and then having + * their defining pg_enum entries go away. However, it's okay if the enum + * type was created in the current transaction, since then there can be no + * such indexes that wouldn't themselves go away on rollback. (We support + * this case because pg_dump --binary-upgrade needs it.) We test this by + * seeing if the pg_type row has xmin == current XID and is not + * HEAP_UPDATED. If it is HEAP_UPDATED, we can't be sure whether the type + * was created or only modified in this xact. So we are disallowing some + * cases that could theoretically be safe; but fortunately pg_dump only + * needs the simplest case. + */ + if (HeapTupleHeaderGetXmin(tup->t_data) == GetCurrentTransactionId() && + !(tup->t_data->t_infomask & HEAP_UPDATED)) + /* safe to do inside transaction block */ ; + else + PreventTransactionChain(isTopLevel, "ALTER TYPE ... ADD"); + /* Check it's an enum and check user has permission to ALTER the enum */ checkEnumOwner(tup); /* Add the new label */ AddEnumLabel(enum_type_oid, stmt->newVal, - stmt->newValNeighbor, stmt->newValIsAfter); + stmt->newValNeighbor, stmt->newValIsAfter, + stmt->skipIfExists); + + InvokeObjectPostAlterHook(TypeRelationId, enum_type_oid, 0); + + ObjectAddressSet(address, TypeRelationId, enum_type_oid); ReleaseSysCache(tup); + + return address; } @@ -1184,8 +1293,344 @@ checkEnumOwner(HeapTuple tup) /* Permission check: must own type */ if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, - format_type_be(HeapTupleGetOid(tup))); + aclcheck_error_type(ACLCHECK_NOT_OWNER, HeapTupleGetOid(tup)); +} + + +/* + * DefineRange + * Registers a new range type. + */ +ObjectAddress +DefineRange(CreateRangeStmt *stmt) +{ + char *typeName; + Oid typeNamespace; + Oid typoid; + char *rangeArrayName; + Oid rangeArrayOid; + Oid rangeSubtype = InvalidOid; + List *rangeSubOpclassName = NIL; + List *rangeCollationName = NIL; + List *rangeCanonicalName = NIL; + List *rangeSubtypeDiffName = NIL; + Oid rangeSubOpclass; + Oid rangeCollation; + regproc rangeCanonical; + regproc rangeSubtypeDiff; + int16 subtyplen; + bool subtypbyval; + char subtypalign; + char alignment; + AclResult aclresult; + ListCell *lc; + ObjectAddress address; + + /* Convert list of names to a name and namespace */ + typeNamespace = QualifiedNameGetCreationNamespace(stmt->typeName, + &typeName); + + /* Check we have creation rights in target namespace */ + aclresult = pg_namespace_aclcheck(typeNamespace, GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_NAMESPACE, + get_namespace_name(typeNamespace)); + + /* + * Look to see if type already exists. + */ + typoid = GetSysCacheOid2(TYPENAMENSP, + CStringGetDatum(typeName), + ObjectIdGetDatum(typeNamespace)); + + /* + * If it's not a shell, see if it's an autogenerated array type, and if so + * rename it out of the way. + */ + if (OidIsValid(typoid) && get_typisdefined(typoid)) + { + if (moveArrayTypeName(typoid, typeName, typeNamespace)) + typoid = InvalidOid; + else + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("type \"%s\" already exists", typeName))); + } + + /* + * If it doesn't exist, create it as a shell, so that the OID is known for + * use in the range function definitions. + */ + if (!OidIsValid(typoid)) + { + address = TypeShellMake(typeName, typeNamespace, GetUserId()); + typoid = address.objectId; + /* Make new shell type visible for modification below */ + CommandCounterIncrement(); + } + + /* Extract the parameters from the parameter list */ + foreach(lc, stmt->params) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (pg_strcasecmp(defel->defname, "subtype") == 0) + { + if (OidIsValid(rangeSubtype)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + /* we can look up the subtype name immediately */ + rangeSubtype = typenameTypeId(NULL, defGetTypeName(defel)); + } + else if (pg_strcasecmp(defel->defname, "subtype_opclass") == 0) + { + if (rangeSubOpclassName != NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + rangeSubOpclassName = defGetQualifiedName(defel); + } + else if (pg_strcasecmp(defel->defname, "collation") == 0) + { + if (rangeCollationName != NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + rangeCollationName = defGetQualifiedName(defel); + } + else if (pg_strcasecmp(defel->defname, "canonical") == 0) + { + if (rangeCanonicalName != NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + rangeCanonicalName = defGetQualifiedName(defel); + } + else if (pg_strcasecmp(defel->defname, "subtype_diff") == 0) + { + if (rangeSubtypeDiffName != NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + rangeSubtypeDiffName = defGetQualifiedName(defel); + } + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("type attribute \"%s\" not recognized", + defel->defname))); + } + + /* Must have a subtype */ + if (!OidIsValid(rangeSubtype)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("type attribute \"subtype\" is required"))); + /* disallow ranges of pseudotypes */ + if (get_typtype(rangeSubtype) == TYPTYPE_PSEUDO) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("range subtype cannot be %s", + format_type_be(rangeSubtype)))); + + /* Identify subopclass */ + rangeSubOpclass = findRangeSubOpclass(rangeSubOpclassName, rangeSubtype); + + /* Identify collation to use, if any */ + if (type_is_collatable(rangeSubtype)) + { + if (rangeCollationName != NIL) + rangeCollation = get_collation_oid(rangeCollationName, false); + else + rangeCollation = get_typcollation(rangeSubtype); + } + else + { + if (rangeCollationName != NIL) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("range collation specified but subtype does not support collation"))); + rangeCollation = InvalidOid; + } + + /* Identify support functions, if provided */ + if (rangeCanonicalName != NIL) + rangeCanonical = findRangeCanonicalFunction(rangeCanonicalName, + typoid); + else + rangeCanonical = InvalidOid; + + if (rangeSubtypeDiffName != NIL) + rangeSubtypeDiff = findRangeSubtypeDiffFunction(rangeSubtypeDiffName, + rangeSubtype); + else + rangeSubtypeDiff = InvalidOid; + + get_typlenbyvalalign(rangeSubtype, + &subtyplen, &subtypbyval, &subtypalign); + + /* alignment must be 'i' or 'd' for ranges */ + alignment = (subtypalign == 'd') ? 'd' : 'i'; + + /* Allocate OID for array type */ + rangeArrayOid = AssignTypeArrayOid(); + + /* Create the pg_type entry */ + address = + TypeCreate(InvalidOid, /* no predetermined type OID */ + typeName, /* type name */ + typeNamespace, /* namespace */ + InvalidOid, /* relation oid (n/a here) */ + 0, /* relation kind (ditto) */ + GetUserId(), /* owner's ID */ + -1, /* internal size (always varlena) */ + TYPTYPE_RANGE, /* type-type (range type) */ + TYPCATEGORY_RANGE, /* type-category (range type) */ + false, /* range types are never preferred */ + DEFAULT_TYPDELIM, /* array element delimiter */ + F_RANGE_IN, /* input procedure */ + F_RANGE_OUT, /* output procedure */ + F_RANGE_RECV, /* receive procedure */ + F_RANGE_SEND, /* send procedure */ + InvalidOid, /* typmodin procedure - none */ + InvalidOid, /* typmodout procedure - none */ + F_RANGE_TYPANALYZE, /* analyze procedure */ + InvalidOid, /* element type ID - none */ + false, /* this is not an array type */ + rangeArrayOid, /* array type we are about to create */ + InvalidOid, /* base type ID (only for domains) */ + NULL, /* never a default type value */ + NULL, /* no binary form available either */ + false, /* never passed by value */ + alignment, /* alignment */ + 'x', /* TOAST strategy (always extended) */ + -1, /* typMod (Domains only) */ + 0, /* Array dimensions of typbasetype */ + false, /* Type NOT NULL */ + InvalidOid); /* type's collation (ranges never have one) */ + Assert(typoid == address.objectId); + + /* Create the entry in pg_range */ + RangeCreate(typoid, rangeSubtype, rangeCollation, rangeSubOpclass, + rangeCanonical, rangeSubtypeDiff); + + /* + * Create the array type that goes with it. + */ + rangeArrayName = makeArrayTypeName(typeName, typeNamespace); + + TypeCreate(rangeArrayOid, /* force assignment of this type OID */ + rangeArrayName, /* type name */ + typeNamespace, /* namespace */ + InvalidOid, /* relation oid (n/a here) */ + 0, /* relation kind (ditto) */ + GetUserId(), /* owner's ID */ + -1, /* internal size (always varlena) */ + TYPTYPE_BASE, /* type-type (base type) */ + TYPCATEGORY_ARRAY, /* type-category (array) */ + false, /* array types are never preferred */ + DEFAULT_TYPDELIM, /* array element delimiter */ + F_ARRAY_IN, /* input procedure */ + F_ARRAY_OUT, /* output procedure */ + F_ARRAY_RECV, /* receive procedure */ + F_ARRAY_SEND, /* send procedure */ + InvalidOid, /* typmodin procedure - none */ + InvalidOid, /* typmodout procedure - none */ + F_ARRAY_TYPANALYZE, /* analyze procedure */ + typoid, /* element type ID */ + true, /* yes this is an array type */ + InvalidOid, /* no further array type */ + InvalidOid, /* base type ID */ + NULL, /* never a default type value */ + NULL, /* binary default isn't sent either */ + false, /* never passed by value */ + alignment, /* alignment - same as range's */ + 'x', /* ARRAY is always toastable */ + -1, /* typMod (Domains only) */ + 0, /* Array dimensions of typbasetype */ + false, /* Type NOT NULL */ + InvalidOid); /* typcollation */ + + pfree(rangeArrayName); + + /* And create the constructor functions for this range type */ + makeRangeConstructors(typeName, typeNamespace, typoid, rangeSubtype); + + return address; +} + +/* + * Because there may exist several range types over the same subtype, the + * range type can't be uniquely determined from the subtype. So it's + * impossible to define a polymorphic constructor; we have to generate new + * constructor functions explicitly for each range type. + * + * We actually define 4 functions, with 0 through 3 arguments. This is just + * to offer more convenience for the user. + */ +static void +makeRangeConstructors(const char *name, Oid namespace, + Oid rangeOid, Oid subtype) +{ + static const char *const prosrc[2] = {"range_constructor2", + "range_constructor3"}; + static const int pronargs[2] = {2, 3}; + + Oid constructorArgTypes[3]; + ObjectAddress myself, + referenced; + int i; + + constructorArgTypes[0] = subtype; + constructorArgTypes[1] = subtype; + constructorArgTypes[2] = TEXTOID; + + referenced.classId = TypeRelationId; + referenced.objectId = rangeOid; + referenced.objectSubId = 0; + + for (i = 0; i < lengthof(prosrc); i++) + { + oidvector *constructorArgTypesVector; + + constructorArgTypesVector = buildoidvector(constructorArgTypes, + pronargs[i]); + + myself = ProcedureCreate(name, /* name: same as range type */ + namespace, /* namespace */ + false, /* replace */ + false, /* returns set */ + rangeOid, /* return type */ + BOOTSTRAP_SUPERUSERID, /* proowner */ + INTERNALlanguageId, /* language */ + F_FMGR_INTERNAL_VALIDATOR, /* language validator */ + prosrc[i], /* prosrc */ + NULL, /* probin */ + false, /* isAgg */ + false, /* isWindowFunc */ + false, /* security_definer */ + false, /* leakproof */ + false, /* isStrict */ + PROVOLATILE_IMMUTABLE, /* volatility */ + PROPARALLEL_SAFE, /* parallel safety */ + constructorArgTypesVector, /* parameterTypes */ + PointerGetDatum(NULL), /* allParameterTypes */ + PointerGetDatum(NULL), /* parameterModes */ + PointerGetDatum(NULL), /* parameterNames */ + NIL, /* parameterDefaults */ + PointerGetDatum(NULL), /* trftypes */ + PointerGetDatum(NULL), /* proconfig */ + 1.0, /* procost */ + 0.0); /* prorows */ + + /* + * Make the constructors internally-dependent on the range type so + * that they go away silently when the type is dropped. Note that + * pg_dump depends on this choice to avoid dumping the constructors. + */ + recordDependencyOn(&myself, &referenced, DEPENDENCY_INTERNAL); + } } @@ -1389,8 +1834,8 @@ findTypeTypmodinFunction(List *procname) if (get_func_rettype(procOid) != INT4OID) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("typmod_in function %s must return type \"integer\"", - NameListToString(procname)))); + errmsg("typmod_in function %s must return type \"%s\"", + NameListToString(procname), "integer"))); return procOid; } @@ -1416,8 +1861,8 @@ findTypeTypmodoutFunction(List *procname) if (get_func_rettype(procOid) != CSTRINGOID) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("typmod_out function %s must return type \"cstring\"", - NameListToString(procname)))); + errmsg("typmod_out function %s must return type \"%s\"", + NameListToString(procname), "cstring"))); return procOid; } @@ -1443,8 +1888,139 @@ findTypeAnalyzeFunction(List *procname, Oid typeOid) if (get_func_rettype(procOid) != BOOLOID) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("type analyze function %s must return type \"boolean\"", - NameListToString(procname)))); + errmsg("type analyze function %s must return type \"%s\"", + NameListToString(procname), "boolean"))); + + return procOid; +} + +/* + * Find suitable support functions and opclasses for a range type. + */ + +/* + * Find named btree opclass for subtype, or default btree opclass if + * opcname is NIL. + */ +static Oid +findRangeSubOpclass(List *opcname, Oid subtype) +{ + Oid opcid; + Oid opInputType; + + if (opcname != NIL) + { + opcid = get_opclass_oid(BTREE_AM_OID, opcname, false); + + /* + * Verify that the operator class accepts this datatype. Note we will + * accept binary compatibility. + */ + opInputType = get_opclass_input_type(opcid); + if (!IsBinaryCoercible(subtype, opInputType)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("operator class \"%s\" does not accept data type %s", + NameListToString(opcname), + format_type_be(subtype)))); + } + else + { + opcid = GetDefaultOpClass(subtype, BTREE_AM_OID); + if (!OidIsValid(opcid)) + { + /* We spell the error message identically to GetIndexOpClass */ + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("data type %s has no default operator class for access method \"%s\"", + format_type_be(subtype), "btree"), + errhint("You must specify an operator class for the range type or define a default operator class for the subtype."))); + } + } + + return opcid; +} + +static Oid +findRangeCanonicalFunction(List *procname, Oid typeOid) +{ + Oid argList[1]; + Oid procOid; + AclResult aclresult; + + /* + * Range canonical functions must take and return the range type, and must + * be immutable. + */ + argList[0] = typeOid; + + procOid = LookupFuncName(procname, 1, argList, true); + + if (!OidIsValid(procOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("function %s does not exist", + func_signature_string(procname, 1, NIL, argList)))); + + if (get_func_rettype(procOid) != typeOid) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("range canonical function %s must return range type", + func_signature_string(procname, 1, NIL, argList)))); + + if (func_volatile(procOid) != PROVOLATILE_IMMUTABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("range canonical function %s must be immutable", + func_signature_string(procname, 1, NIL, argList)))); + + /* Also, range type's creator must have permission to call function */ + aclresult = pg_proc_aclcheck(procOid, GetUserId(), ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(procOid)); + + return procOid; +} + +static Oid +findRangeSubtypeDiffFunction(List *procname, Oid subtype) +{ + Oid argList[2]; + Oid procOid; + AclResult aclresult; + + /* + * Range subtype diff functions must take two arguments of the subtype, + * must return float8, and must be immutable. + */ + argList[0] = subtype; + argList[1] = subtype; + + procOid = LookupFuncName(procname, 2, argList, true); + + if (!OidIsValid(procOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("function %s does not exist", + func_signature_string(procname, 2, NIL, argList)))); + + if (get_func_rettype(procOid) != FLOAT8OID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("range subtype diff function %s must return type \"%s\"", + func_signature_string(procname, 2, NIL, argList), + "double precision"))); + + if (func_volatile(procOid) != PROVOLATILE_IMMUTABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("range subtype diff function %s must be immutable", + func_signature_string(procname, 2, NIL, argList)))); + + /* Also, range type's creator must have permission to call function */ + aclresult = pg_proc_aclcheck(procOid, GetUserId(), ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(procOid)); return procOid; } @@ -1459,9 +2035,14 @@ AssignTypeArrayOid(void) { Oid type_array_oid; - /* Use binary-upgrade override for pg_type.typarray, if supplied. */ - if (IsBinaryUpgrade && OidIsValid(binary_upgrade_next_array_pg_type_oid)) + /* Use binary-upgrade override for pg_type.typarray? */ + if (IsBinaryUpgrade) { + if (!OidIsValid(binary_upgrade_next_array_pg_type_oid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_type array OID value not set when in binary upgrade mode"))); + type_array_oid = binary_upgrade_next_array_pg_type_oid; binary_upgrade_next_array_pg_type_oid = InvalidOid; } @@ -1487,27 +2068,26 @@ AssignTypeArrayOid(void) * If the relation already exists, then 'DefineRelation' will abort * the xact... * - * DefineCompositeType returns relid for use when creating - * an implicit composite type during function creation + * Return type is the new type's object address. *------------------------------------------------------------------- */ -Oid -DefineCompositeType(const RangeVar *typevar, List *coldeflist) +ObjectAddress +DefineCompositeType(RangeVar *typevar, List *coldeflist) { CreateStmt *createStmt = makeNode(CreateStmt); Oid old_type_oid; Oid typeNamespace; - Oid relid; + ObjectAddress address; /* * now set the parameters for keys/inheritance etc. All of these are * uninteresting for composite types... */ - createStmt->relation = (RangeVar *) typevar; + createStmt->relation = typevar; createStmt->tableElts = coldeflist; createStmt->inhRelations = NIL; createStmt->constraints = NIL; - createStmt->options = list_make1(defWithOids(false)); + createStmt->options = NIL; createStmt->oncommit = ONCOMMIT_NOOP; createStmt->tablespacename = NULL; createStmt->if_not_exists = false; @@ -1518,7 +2098,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist) * check is here mainly to get a better error message about a "type" * instead of below about a "relation". */ - typeNamespace = RangeVarGetCreationNamespace(createStmt->relation); + typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation, + NoLock, NULL); RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace); old_type_oid = GetSysCacheOid2(TYPENAMENSP, @@ -1535,17 +2116,19 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist) /* * Finally create the relation. This also creates the type. */ - relid = DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE, InvalidOid); - Assert(relid != InvalidOid); - return relid; + DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE, InvalidOid, &address); + + return address; } /* * AlterDomainDefault * * Routine implementing ALTER DOMAIN SET/DROP DEFAULT statements. + * + * Returns ObjectAddress of the modified domain. */ -void +ObjectAddress AlterDomainDefault(List *names, Node *defaultRaw) { TypeName *typename; @@ -1560,6 +2143,7 @@ AlterDomainDefault(List *names, Node *defaultRaw) bool new_record_repl[Natts_pg_type]; HeapTuple newtuple; Form_pg_type typTup; + ObjectAddress address; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -1618,9 +2202,7 @@ AlterDomainDefault(List *names, Node *defaultRaw) * easier for pg_dump). */ defaultValue = deparse_expression(defaultExpr, - deparse_context_for(NameStr(typTup->typname), - InvalidOid), - false, false); + NIL, false, false); /* * Form an updated tuple with the new default and write it back. @@ -1669,17 +2251,25 @@ AlterDomainDefault(List *names, Node *defaultRaw) defaultExpr, true); /* Rebuild is true */ + InvokeObjectPostAlterHook(TypeRelationId, domainoid, 0); + + ObjectAddressSet(address, TypeRelationId, domainoid); + /* Clean up */ heap_close(rel, NoLock); heap_freetuple(newtuple); + + return address; } /* * AlterDomainNotNull * * Routine implementing ALTER DOMAIN SET/DROP NOT NULL statements. + * + * Returns ObjectAddress of the modified domain. */ -void +ObjectAddress AlterDomainNotNull(List *names, bool notNull) { TypeName *typename; @@ -1687,6 +2277,7 @@ AlterDomainNotNull(List *names, bool notNull) Relation typrel; HeapTuple tup; Form_pg_type typTup; + ObjectAddress address = InvalidObjectAddress; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -1707,7 +2298,7 @@ AlterDomainNotNull(List *names, bool notNull) if (typTup->typnotnull == notNull) { heap_close(typrel, RowExclusiveLock); - return; + return address; } /* Adding a NOT NULL constraint requires checking existing columns */ @@ -1728,9 +2319,11 @@ AlterDomainNotNull(List *names, bool notNull) TupleDesc tupdesc = RelationGetDescr(testrel); HeapScanDesc scan; HeapTuple tuple; + Snapshot snapshot; /* Scan all tuples in this relation */ - scan = heap_beginscan(testrel, SnapshotNow, 0, NULL); + snapshot = RegisterSnapshot(GetLatestSnapshot()); + scan = heap_beginscan(testrel, snapshot, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { int i; @@ -1741,14 +2334,26 @@ AlterDomainNotNull(List *names, bool notNull) int attnum = rtc->atts[i]; if (heap_attisnull(tuple, attnum)) + { + /* + * In principle the auxiliary information for this + * error should be errdatatype(), but errtablecol() + * seems considerably more useful in practice. Since + * this code only executes in an ALTER DOMAIN command, + * the client should already know which domain is in + * question. + */ ereport(ERROR, (errcode(ERRCODE_NOT_NULL_VIOLATION), errmsg("column \"%s\" of table \"%s\" contains null values", NameStr(tupdesc->attrs[attnum - 1]->attname), - RelationGetRelationName(testrel)))); + RelationGetRelationName(testrel)), + errtablecol(testrel, attnum))); + } } } heap_endscan(scan); + UnregisterSnapshot(snapshot); /* Close each rel after processing, but keep lock */ heap_close(testrel, NoLock); @@ -1756,7 +2361,7 @@ AlterDomainNotNull(List *names, bool notNull) } /* - * Okay to update pg_type row. We can scribble on typTup because it's a + * Okay to update pg_type row. We can scribble on typTup because it's a * copy. */ typTup->typnotnull = notNull; @@ -1765,9 +2370,15 @@ AlterDomainNotNull(List *names, bool notNull) CatalogUpdateIndexes(typrel, tup); + InvokeObjectPostAlterHook(TypeRelationId, domainoid, 0); + + ObjectAddressSet(address, TypeRelationId, domainoid); + /* Clean up */ heap_freetuple(tup); heap_close(typrel, RowExclusiveLock); + + return address; } /* @@ -1775,9 +2386,9 @@ AlterDomainNotNull(List *names, bool notNull) * * Implements the ALTER DOMAIN DROP CONSTRAINT statement */ -void +ObjectAddress AlterDomainDropConstraint(List *names, const char *constrName, - DropBehavior behavior) + DropBehavior behavior, bool missing_ok) { TypeName *typename; Oid domainoid; @@ -1787,6 +2398,8 @@ AlterDomainDropConstraint(List *names, const char *constrName, SysScanDesc conscan; ScanKeyData key[1]; HeapTuple contup; + bool found = false; + ObjectAddress address = InvalidObjectAddress; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -1812,7 +2425,7 @@ AlterDomainDropConstraint(List *names, const char *constrName, ObjectIdGetDatum(HeapTupleGetOid(tup))); conscan = systable_beginscan(conrel, ConstraintTypidIndexId, true, - SnapshotNow, 1, key); + NULL, 1, key); /* * Scan over the result set, removing any matching entries. @@ -1829,14 +2442,33 @@ AlterDomainDropConstraint(List *names, const char *constrName, conobj.objectId = HeapTupleGetOid(contup); conobj.objectSubId = 0; - performDeletion(&conobj, behavior); + performDeletion(&conobj, behavior, 0); + found = true; } } + + ObjectAddressSet(address, TypeRelationId, domainoid); + /* Clean up after the scan */ systable_endscan(conscan); heap_close(conrel, RowExclusiveLock); heap_close(rel, NoLock); + + if (!found) + { + if (!missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of domain \"%s\" does not exist", + constrName, TypeNameToString(typename)))); + else + ereport(NOTICE, + (errmsg("constraint \"%s\" of domain \"%s\" does not exist, skipping", + constrName, TypeNameToString(typename)))); + } + + return address; } /* @@ -1844,8 +2476,9 @@ AlterDomainDropConstraint(List *names, const char *constrName, * * Implements the ALTER DOMAIN .. ADD CONSTRAINT statement. */ -void -AlterDomainAddConstraint(List *names, Node *newConstraint) +ObjectAddress +AlterDomainAddConstraint(List *names, Node *newConstraint, + ObjectAddress *constrAddr) { TypeName *typename; Oid domainoid; @@ -1854,6 +2487,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) Form_pg_type typTup; Constraint *constr; char *ccbin; + ObjectAddress address; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -1923,13 +2557,13 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) /* * Since all other constraint types throw errors, this must be a check - * constraint. First, process the constraint expression and add an entry + * constraint. First, process the constraint expression and add an entry * to pg_constraint. */ - ccbin = domainAddConstraint(HeapTupleGetOid(tup), typTup->typnamespace, + ccbin = domainAddConstraint(domainoid, typTup->typnamespace, typTup->typbasetype, typTup->typtypmod, - constr, NameStr(typTup->typname)); + constr, NameStr(typTup->typname), constrAddr); /* * If requested to validate the constraint, test all values stored in the @@ -1938,8 +2572,12 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) if (!constr->skip_validation) validateDomainConstraint(domainoid, ccbin); + ObjectAddressSet(address, TypeRelationId, domainoid); + /* Clean up */ heap_close(typrel, RowExclusiveLock); + + return address; } /* @@ -1947,7 +2585,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) * * Implements the ALTER DOMAIN .. VALIDATE CONSTRAINT statement. */ -void +ObjectAddress AlterDomainValidateConstraint(List *names, char *constrName) { TypeName *typename; @@ -1958,13 +2596,14 @@ AlterDomainValidateConstraint(List *names, char *constrName) Form_pg_constraint con = NULL; Form_pg_constraint copy_con; char *conbin; - SysScanDesc scan; + SysScanDesc scan; Datum val; bool found = false; bool isnull; HeapTuple tuple; HeapTuple copyTuple; - ScanKeyData key; + ScanKeyData key; + ObjectAddress address; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -1989,7 +2628,7 @@ AlterDomainValidateConstraint(List *names, char *constrName) BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(domainoid)); scan = systable_beginscan(conrel, ConstraintTypidIndexId, - true, SnapshotNow, 1, &key); + true, NULL, 1, &key); while (HeapTupleIsValid(tuple = systable_getnext(scan))) { @@ -2010,8 +2649,8 @@ AlterDomainValidateConstraint(List *names, char *constrName) if (con->contype != CONSTRAINT_CHECK) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint", - constrName, TypeNameToString(typename)))); + errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint", + constrName, TypeNameToString(typename)))); val = SysCacheGetAttr(CONSTROID, tuple, Anum_pg_constraint_conbin, @@ -2031,6 +2670,12 @@ AlterDomainValidateConstraint(List *names, char *constrName) copy_con->convalidated = true; simple_heap_update(conrel, ©Tuple->t_self, copyTuple); CatalogUpdateIndexes(conrel, copyTuple); + + InvokeObjectPostAlterHook(ConstraintRelationId, + HeapTupleGetOid(copyTuple), 0); + + ObjectAddressSet(address, TypeRelationId, domainoid); + heap_freetuple(copyTuple); systable_endscan(scan); @@ -2039,6 +2684,8 @@ AlterDomainValidateConstraint(List *names, char *constrName) heap_close(conrel, RowExclusiveLock); ReleaseSysCache(tup); + + return address; } static void @@ -2070,9 +2717,11 @@ validateDomainConstraint(Oid domainoid, char *ccbin) TupleDesc tupdesc = RelationGetDescr(testrel); HeapScanDesc scan; HeapTuple tuple; + Snapshot snapshot; /* Scan all tuples in this relation */ - scan = heap_beginscan(testrel, SnapshotNow, 0, NULL); + snapshot = RegisterSnapshot(GetLatestSnapshot()); + scan = heap_beginscan(testrel, snapshot, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { int i; @@ -2095,16 +2744,28 @@ validateDomainConstraint(Oid domainoid, char *ccbin) &isNull, NULL); if (!isNull && !DatumGetBool(conResult)) + { + /* + * In principle the auxiliary information for this error + * should be errdomainconstraint(), but errtablecol() + * seems considerably more useful in practice. Since this + * code only executes in an ALTER DOMAIN command, the + * client should already know which domain is in question, + * and which constraint too. + */ ereport(ERROR, (errcode(ERRCODE_CHECK_VIOLATION), errmsg("column \"%s\" of table \"%s\" contains values that violate the new constraint", NameStr(tupdesc->attrs[attnum - 1]->attname), - RelationGetRelationName(testrel)))); + RelationGetRelationName(testrel)), + errtablecol(testrel, attnum))); + } } ResetExprContext(econtext); } heap_endscan(scan); + UnregisterSnapshot(snapshot); /* Hold relation lock till commit (XXX bad for concurrency) */ heap_close(testrel, NoLock); @@ -2112,6 +2773,7 @@ validateDomainConstraint(Oid domainoid, char *ccbin) FreeExecutorState(estate); } + /* * get_rels_with_domain * @@ -2171,7 +2833,7 @@ get_rels_with_domain(Oid domainOid, LOCKMODE lockmode) ObjectIdGetDatum(domainOid)); depScan = systable_beginscan(depRel, DependReferenceIndexId, true, - SnapshotNow, 2, key); + NULL, 2, key); while (HeapTupleIsValid(depTup = systable_getnext(depScan))) { @@ -2233,8 +2895,16 @@ get_rels_with_domain(Oid domainOid, LOCKMODE lockmode) NULL, format_type_be(domainOid)); - /* Otherwise we can ignore views, composite types, etc */ - if (rel->rd_rel->relkind != RELKIND_RELATION) + /* + * Otherwise, we can ignore relations except those with both + * storage and user-chosen column types. + * + * XXX If an index-only scan could satisfy "col::some_domain" from + * a suitable expression index, this should also check expression + * index columns. + */ + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_MATVIEW) { relation_close(rel, lockmode); continue; @@ -2250,7 +2920,7 @@ get_rels_with_domain(Oid domainOid, LOCKMODE lockmode) /* * Confirm column has not been dropped, and is of the expected type. - * This defends against an ALTER DROP COLUMN occuring just before we + * This defends against an ALTER DROP COLUMN occurring just before we * acquired lock ... but if the whole table were dropped, we'd still * have a problem. */ @@ -2261,7 +2931,7 @@ get_rels_with_domain(Oid domainOid, LOCKMODE lockmode) continue; /* - * Okay, add column to result. We store the columns in column-number + * Okay, add column to result. We store the columns in column-number * order; this is just a hack to improve predictability of regression * test output ... */ @@ -2289,7 +2959,7 @@ get_rels_with_domain(Oid domainOid, LOCKMODE lockmode) * Check that the type is actually a domain and that the current user * has permission to do ALTER DOMAIN on it. Throw an error if not. */ -static void +void checkDomainOwner(HeapTuple tup) { Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup); @@ -2303,8 +2973,7 @@ checkDomainOwner(HeapTuple tup) /* Permission check: must own type */ if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, - format_type_be(HeapTupleGetOid(tup))); + aclcheck_error_type(ACLCHECK_NOT_OWNER, HeapTupleGetOid(tup)); } /* @@ -2313,13 +2982,14 @@ checkDomainOwner(HeapTuple tup) static char * domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, int typMod, Constraint *constr, - char *domainName) + char *domainName, ObjectAddress *constrAddr) { Node *expr; char *ccsrc; char *ccbin; ParseState *pstate; CoerceToDomainValue *domVal; + Oid ccoid; /* * Assign or validate constraint name @@ -2349,7 +3019,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, /* * Set up a CoerceToDomainValue to represent the occurrence of VALUE in - * the expression. Note that it will appear to have the type of the base + * the expression. Note that it will appear to have the type of the base * type, not the domain. This seems correct since within the check * expression, we should not assume the input value can be considered a * member of the domain. @@ -2362,7 +3032,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, pstate->p_value_substitute = (Node *) domVal; - expr = transformExpr(pstate, constr->raw_expr); + expr = transformExpr(pstate, constr->raw_expr, EXPR_KIND_DOMAIN_CHECK); /* * Make sure it yields a boolean result. @@ -2375,38 +3045,15 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, assign_expr_collations(pstate, expr); /* - * Make sure no outside relations are referred to. + * Domains don't allow variables (this is probably dead code now that + * add_missing_from is history, but let's be sure). */ - if (list_length(pstate->p_rtable) != 0) + if (list_length(pstate->p_rtable) != 0 || + contain_var_clause(expr)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("cannot use table references in domain check constraint"))); - /* - * Domains don't allow var clauses (this should be redundant with the - * above check, but make it anyway) - */ - if (contain_var_clause(expr)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("cannot use table references in domain check constraint"))); - - /* - * No subplans or aggregates, either... - */ - if (pstate->p_hasSubLinks) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use subquery in check constraint"))); - if (pstate->p_hasAggs) - ereport(ERROR, - (errcode(ERRCODE_GROUPING_ERROR), - errmsg("cannot use aggregate function in check constraint"))); - if (pstate->p_hasWindowFuncs) - ereport(ERROR, - (errcode(ERRCODE_WINDOWING_ERROR), - errmsg("cannot use window function in check constraint"))); - /* * Convert to string form for storage. */ @@ -2414,44 +3061,44 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, /* * Deparse it to produce text for consrc. - * - * Since VARNOs aren't allowed in domain constraints, relation context - * isn't required as anything other than a shell. */ ccsrc = deparse_expression(expr, - deparse_context_for(domainName, - InvalidOid), - false, false); + NIL, false, false); /* * Store the constraint in pg_constraint */ - CreateConstraintEntry(constr->conname, /* Constraint Name */ - domainNamespace, /* namespace */ - CONSTRAINT_CHECK, /* Constraint Type */ - false, /* Is Deferrable */ - false, /* Is Deferred */ - !constr->skip_validation, /* Is Validated */ - InvalidOid, /* not a relation constraint */ - NULL, - 0, - domainOid, /* domain constraint */ - InvalidOid, /* no associated index */ - InvalidOid, /* Foreign key fields */ - NULL, - NULL, - NULL, - NULL, - 0, - ' ', - ' ', - ' ', - NULL, /* not an exclusion constraint */ - expr, /* Tree form of check constraint */ - ccbin, /* Binary form of check constraint */ - ccsrc, /* Source form of check constraint */ - true, /* is local */ - 0); /* inhcount */ + ccoid = + CreateConstraintEntry(constr->conname, /* Constraint Name */ + domainNamespace, /* namespace */ + CONSTRAINT_CHECK, /* Constraint Type */ + false, /* Is Deferrable */ + false, /* Is Deferred */ + !constr->skip_validation, /* Is Validated */ + InvalidOid, /* not a relation constraint */ + NULL, + 0, + domainOid, /* domain constraint */ + InvalidOid, /* no associated index */ + InvalidOid, /* Foreign key fields */ + NULL, + NULL, + NULL, + NULL, + 0, + ' ', + ' ', + ' ', + NULL, /* not an exclusion constraint */ + expr, /* Tree form of check constraint */ + ccbin, /* Binary form of check constraint */ + ccsrc, /* Source form of check constraint */ + true, /* is local */ + 0, /* inhcount */ + false, /* connoinherit */ + false); /* is_internal */ + if (constrAddr) + ObjectAddressSet(*constrAddr, ConstraintRelationId, ccoid); /* * Return the compiled constraint expression so the calling routine can @@ -2460,138 +3107,21 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, return ccbin; } -/* - * GetDomainConstraints - get a list of the current constraints of domain - * - * Returns a possibly-empty list of DomainConstraintState nodes. - * - * This is called by the executor during plan startup for a CoerceToDomain - * expression node. The given constraints will be checked for each value - * passed through the node. - * - * We allow this to be called for non-domain types, in which case the result - * is always NIL. - */ -List * -GetDomainConstraints(Oid typeOid) -{ - List *result = NIL; - bool notNull = false; - Relation conRel; - - conRel = heap_open(ConstraintRelationId, AccessShareLock); - - for (;;) - { - HeapTuple tup; - HeapTuple conTup; - Form_pg_type typTup; - ScanKeyData key[1]; - SysScanDesc scan; - - tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeOid)); - if (!HeapTupleIsValid(tup)) - elog(ERROR, "cache lookup failed for type %u", typeOid); - typTup = (Form_pg_type) GETSTRUCT(tup); - - if (typTup->typtype != TYPTYPE_DOMAIN) - { - /* Not a domain, so done */ - ReleaseSysCache(tup); - break; - } - - /* Test for NOT NULL Constraint */ - if (typTup->typnotnull) - notNull = true; - - /* Look for CHECK Constraints on this domain */ - ScanKeyInit(&key[0], - Anum_pg_constraint_contypid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(typeOid)); - - scan = systable_beginscan(conRel, ConstraintTypidIndexId, true, - SnapshotNow, 1, key); - - while (HeapTupleIsValid(conTup = systable_getnext(scan))) - { - Form_pg_constraint c = (Form_pg_constraint) GETSTRUCT(conTup); - Datum val; - bool isNull; - Expr *check_expr; - DomainConstraintState *r; - - /* Ignore non-CHECK constraints (presently, shouldn't be any) */ - if (c->contype != CONSTRAINT_CHECK) - continue; - - /* - * Not expecting conbin to be NULL, but we'll test for it anyway - */ - val = fastgetattr(conTup, Anum_pg_constraint_conbin, - conRel->rd_att, &isNull); - if (isNull) - elog(ERROR, "domain \"%s\" constraint \"%s\" has NULL conbin", - NameStr(typTup->typname), NameStr(c->conname)); - - check_expr = (Expr *) stringToNode(TextDatumGetCString(val)); - - /* ExecInitExpr assumes we've planned the expression */ - check_expr = expression_planner(check_expr); - - r = makeNode(DomainConstraintState); - r->constrainttype = DOM_CONSTRAINT_CHECK; - r->name = pstrdup(NameStr(c->conname)); - r->check_expr = ExecInitExpr(check_expr, NULL); - - /* - * use lcons() here because constraints of lower domains should be - * applied earlier. - */ - result = lcons(r, result); - } - - systable_endscan(scan); - - /* loop to next domain in stack */ - typeOid = typTup->typbasetype; - ReleaseSysCache(tup); - } - - heap_close(conRel, AccessShareLock); - - /* - * Only need to add one NOT NULL check regardless of how many domains in - * the stack request it. - */ - if (notNull) - { - DomainConstraintState *r = makeNode(DomainConstraintState); - - r->constrainttype = DOM_CONSTRAINT_NOTNULL; - r->name = pstrdup("NOT NULL"); - r->check_expr = NULL; - - /* lcons to apply the nullness check FIRST */ - result = lcons(r, result); - } - - return result; -} - /* * Execute ALTER TYPE RENAME */ -void -RenameType(List *names, const char *newTypeName) +ObjectAddress +RenameType(RenameStmt *stmt) { + List *names = stmt->object; + const char *newTypeName = stmt->newname; TypeName *typename; Oid typeOid; Relation rel; HeapTuple tup; Form_pg_type typTup; + ObjectAddress address; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -2607,8 +3137,14 @@ RenameType(List *names, const char *newTypeName) /* check permissions on type */ if (!pg_type_ownercheck(typeOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, - format_type_be(typeOid)); + aclcheck_error_type(ACLCHECK_NOT_OWNER, typeOid); + + /* ALTER DOMAIN used on a non-domain? */ + if (stmt->renameType == OBJECT_DOMAIN && typTup->typtype != TYPTYPE_DOMAIN) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a domain", + format_type_be(typeOid)))); /* * If it's a composite type, we need to check that it really is a @@ -2638,21 +3174,23 @@ RenameType(List *names, const char *newTypeName) * RenameRelationInternal will call RenameTypeInternal automatically. */ if (typTup->typtype == TYPTYPE_COMPOSITE) - RenameRelationInternal(typTup->typrelid, newTypeName, - typTup->typnamespace); + RenameRelationInternal(typTup->typrelid, newTypeName, false); else RenameTypeInternal(typeOid, newTypeName, typTup->typnamespace); + ObjectAddressSet(address, TypeRelationId, typeOid); /* Clean up */ heap_close(rel, RowExclusiveLock); + + return address; } /* * Change the owner of a type. */ -void -AlterTypeOwner(List *names, Oid newOwnerId) +ObjectAddress +AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype) { TypeName *typename; Oid typeOid; @@ -2661,6 +3199,7 @@ AlterTypeOwner(List *names, Oid newOwnerId) HeapTuple newtup; Form_pg_type typTup; AclResult aclresult; + ObjectAddress address; rel = heap_open(TypeRelationId, RowExclusiveLock); @@ -2668,7 +3207,7 @@ AlterTypeOwner(List *names, Oid newOwnerId) typename = makeTypeNameFromNameList(names); /* Use LookupTypeName here so that shell types can be processed */ - tup = LookupTypeName(NULL, typename, NULL); + tup = LookupTypeName(NULL, typename, NULL, false); if (tup == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), @@ -2682,6 +3221,13 @@ AlterTypeOwner(List *names, Oid newOwnerId) tup = newtup; typTup = (Form_pg_type) GETSTRUCT(tup); + /* Don't allow ALTER DOMAIN on a type */ + if (objecttype == OBJECT_DOMAIN && typTup->typtype != TYPTYPE_DOMAIN) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("%s is not a domain", + format_type_be(typeOid)))); + /* * If it's a composite type, we need to check that it really is a * free-standing composite type, and not a table's rowtype. We want people @@ -2716,8 +3262,7 @@ AlterTypeOwner(List *names, Oid newOwnerId) { /* Otherwise, must be owner of the existing object */ if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, - format_type_be(HeapTupleGetOid(tup))); + aclcheck_error_type(ACLCHECK_NOT_OWNER, HeapTupleGetOid(tup)); /* Must be able to become new owner */ check_is_member_of_role(GetUserId(), newOwnerId); @@ -2731,53 +3276,30 @@ AlterTypeOwner(List *names, Oid newOwnerId) get_namespace_name(typTup->typnamespace)); } - /* - * If it's a composite type, invoke ATExecChangeOwner so that we fix - * up the pg_class entry properly. That will call back to - * AlterTypeOwnerInternal to take care of the pg_type entry(s). - */ - if (typTup->typtype == TYPTYPE_COMPOSITE) - ATExecChangeOwner(typTup->typrelid, newOwnerId, true, AccessExclusiveLock); - else - { - /* - * We can just apply the modification directly. - * - * okay to scribble on typTup because it's a copy - */ - typTup->typowner = newOwnerId; - - simple_heap_update(rel, &tup->t_self, tup); - - CatalogUpdateIndexes(rel, tup); - - /* Update owner dependency reference */ - changeDependencyOnOwner(TypeRelationId, typeOid, newOwnerId); - - /* If it has an array type, update that too */ - if (OidIsValid(typTup->typarray)) - AlterTypeOwnerInternal(typTup->typarray, newOwnerId, false); - } + AlterTypeOwner_oid(typeOid, newOwnerId, true); } + ObjectAddressSet(address, TypeRelationId, typeOid); + /* Clean up */ heap_close(rel, RowExclusiveLock); + + return address; } /* - * AlterTypeOwnerInternal - change type owner unconditionally + * AlterTypeOwner_oid - change type owner unconditionally * - * This is currently only used to propagate ALTER TABLE/TYPE OWNER to a - * table's rowtype or an array type, and to implement REASSIGN OWNED BY. - * It assumes the caller has done all needed checks. The function will - * automatically recurse to an array type if the type has one. + * This function recurses to handle a pg_class entry, if necessary. It + * invokes any necessary access object hooks. If hasDependEntry is TRUE, this + * function modifies the pg_shdepend entry appropriately (this should be + * passed as FALSE only for table rowtypes and array types). * - * hasDependEntry should be TRUE if type is expected to have a pg_shdepend - * entry (ie, it's not a table rowtype nor an array type). + * This is used by ALTER TABLE/TYPE OWNER commands, as well as by REASSIGN + * OWNED BY. It assumes the caller has done all needed check. */ void -AlterTypeOwnerInternal(Oid typeOid, Oid newOwnerId, - bool hasDependEntry) +AlterTypeOwner_oid(Oid typeOid, Oid newOwnerId, bool hasDependEntry) { Relation rel; HeapTuple tup; @@ -2785,27 +3307,86 @@ AlterTypeOwnerInternal(Oid typeOid, Oid newOwnerId, rel = heap_open(TypeRelationId, RowExclusiveLock); - tup = SearchSysCacheCopy1(TYPEOID, ObjectIdGetDatum(typeOid)); + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeOid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", typeOid); typTup = (Form_pg_type) GETSTRUCT(tup); /* - * Modify the owner --- okay to scribble on typTup because it's a copy + * If it's a composite type, invoke ATExecChangeOwner so that we fix up the + * pg_class entry properly. That will call back to AlterTypeOwnerInternal + * to take care of the pg_type entry(s). */ - typTup->typowner = newOwnerId; + if (typTup->typtype == TYPTYPE_COMPOSITE) + ATExecChangeOwner(typTup->typrelid, newOwnerId, true, AccessExclusiveLock); + else + AlterTypeOwnerInternal(typeOid, newOwnerId); + + /* Update owner dependency reference */ + if (hasDependEntry) + changeDependencyOnOwner(TypeRelationId, typeOid, newOwnerId); + + InvokeObjectPostAlterHook(TypeRelationId, typeOid, 0); + + ReleaseSysCache(tup); + heap_close(rel, RowExclusiveLock); +} + +/* + * AlterTypeOwnerInternal - bare-bones type owner change. + * + * This routine simply modifies the owner of a pg_type entry, and recurses + * to handle a possible array type. + */ +void +AlterTypeOwnerInternal(Oid typeOid, Oid newOwnerId) +{ + Relation rel; + HeapTuple tup; + Form_pg_type typTup; + Datum repl_val[Natts_pg_type]; + bool repl_null[Natts_pg_type]; + bool repl_repl[Natts_pg_type]; + Acl *newAcl; + Datum aclDatum; + bool isNull; + + rel = heap_open(TypeRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy1(TYPEOID, ObjectIdGetDatum(typeOid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", typeOid); + typTup = (Form_pg_type) GETSTRUCT(tup); + + memset(repl_null, false, sizeof(repl_null)); + memset(repl_repl, false, sizeof(repl_repl)); + + repl_repl[Anum_pg_type_typowner - 1] = true; + repl_val[Anum_pg_type_typowner - 1] = ObjectIdGetDatum(newOwnerId); + + aclDatum = heap_getattr(tup, + Anum_pg_type_typacl, + RelationGetDescr(rel), + &isNull); + /* Null ACLs do not require changes */ + if (!isNull) + { + newAcl = aclnewowner(DatumGetAclP(aclDatum), + typTup->typowner, newOwnerId); + repl_repl[Anum_pg_type_typacl - 1] = true; + repl_val[Anum_pg_type_typacl - 1] = PointerGetDatum(newAcl); + } + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, + repl_repl); simple_heap_update(rel, &tup->t_self, tup); CatalogUpdateIndexes(rel, tup); - /* Update owner dependency reference, if it has one */ - if (hasDependEntry) - changeDependencyOnOwner(TypeRelationId, typeOid, newOwnerId); - /* If it has an array type, update that too */ if (OidIsValid(typTup->typarray)) - AlterTypeOwnerInternal(typTup->typarray, newOwnerId, false); + AlterTypeOwnerInternal(typTup->typarray, newOwnerId); /* Clean up */ heap_close(rel, RowExclusiveLock); @@ -2814,32 +3395,51 @@ AlterTypeOwnerInternal(Oid typeOid, Oid newOwnerId, /* * Execute ALTER TYPE SET SCHEMA */ -void -AlterTypeNamespace(List *names, const char *newschema) +ObjectAddress +AlterTypeNamespace(List *names, const char *newschema, ObjectType objecttype, + Oid *oldschema) { TypeName *typename; Oid typeOid; Oid nspOid; + Oid oldNspOid; + ObjectAddresses *objsMoved; + ObjectAddress myself; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); typeOid = typenameTypeId(NULL, typename); + /* Don't allow ALTER DOMAIN on a type */ + if (objecttype == OBJECT_DOMAIN && get_typtype(typeOid) != TYPTYPE_DOMAIN) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("%s is not a domain", + format_type_be(typeOid)))); + /* get schema OID and check its permissions */ nspOid = LookupCreationNamespace(newschema); - AlterTypeNamespace_oid(typeOid, nspOid); + objsMoved = new_object_addresses(); + oldNspOid = AlterTypeNamespace_oid(typeOid, nspOid, objsMoved); + free_object_addresses(objsMoved); + + if (oldschema) + *oldschema = oldNspOid; + + ObjectAddressSet(myself, TypeRelationId, typeOid); + + return myself; } Oid -AlterTypeNamespace_oid(Oid typeOid, Oid nspOid) +AlterTypeNamespace_oid(Oid typeOid, Oid nspOid, ObjectAddresses *objsMoved) { Oid elemOid; /* check permissions on type */ if (!pg_type_ownercheck(typeOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, - format_type_be(typeOid)); + aclcheck_error_type(ACLCHECK_NOT_OWNER, typeOid); /* don't allow direct alteration of array types */ elemOid = get_element_type(typeOid); @@ -2852,7 +3452,7 @@ AlterTypeNamespace_oid(Oid typeOid, Oid nspOid) format_type_be(elemOid)))); /* and do the work */ - return AlterTypeNamespaceInternal(typeOid, nspOid, false, true); + return AlterTypeNamespaceInternal(typeOid, nspOid, false, true, objsMoved); } /* @@ -2861,7 +3461,7 @@ AlterTypeNamespace_oid(Oid typeOid, Oid nspOid) * Caller must have already checked privileges. * * The function automatically recurses to process the type's array type, - * if any. isImplicitArray should be TRUE only when doing this internal + * if any. isImplicitArray should be TRUE only when doing this internal * recursion (outside callers must never try to move an array type directly). * * If errorOnTableType is TRUE, the function errors out if the type is @@ -2873,7 +3473,8 @@ AlterTypeNamespace_oid(Oid typeOid, Oid nspOid) Oid AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, bool isImplicitArray, - bool errorOnTableType) + bool errorOnTableType, + ObjectAddresses *objsMoved) { Relation rel; HeapTuple tup; @@ -2881,6 +3482,17 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, Oid oldNspOid; Oid arrayOid; bool isCompositeType; + ObjectAddress thisobj; + + /* + * Make sure we haven't moved this object previously. + */ + thisobj.classId = TypeRelationId; + thisobj.objectId = typeOid; + thisobj.objectSubId = 0; + + if (object_address_present(&thisobj, objsMoved)) + return InvalidOid; rel = heap_open(TypeRelationId, RowExclusiveLock); @@ -2892,18 +3504,22 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, oldNspOid = typform->typnamespace; arrayOid = typform->typarray; - /* common checks on switching namespaces */ - CheckSetNamespace(oldNspOid, nspOid, TypeRelationId, typeOid); + /* If the type is already there, we scan skip these next few checks. */ + if (oldNspOid != nspOid) + { + /* common checks on switching namespaces */ + CheckSetNamespace(oldNspOid, nspOid); - /* check for duplicate name (more friendly than unique-index failure) */ - if (SearchSysCacheExists2(TYPENAMENSP, - CStringGetDatum(NameStr(typform->typname)), - ObjectIdGetDatum(nspOid))) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("type \"%s\" already exists in schema \"%s\"", - NameStr(typform->typname), - get_namespace_name(nspOid)))); + /* check for duplicate name (more friendly than unique-index failure) */ + if (SearchSysCacheExists2(TYPENAMENSP, + CStringGetDatum(NameStr(typform->typname)), + ObjectIdGetDatum(nspOid))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("type \"%s\" already exists in schema \"%s\"", + NameStr(typform->typname), + get_namespace_name(nspOid)))); + } /* Detect whether type is a composite type (but not a table rowtype) */ isCompositeType = @@ -2919,13 +3535,16 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, format_type_be(typeOid)), errhint("Use ALTER TABLE instead."))); - /* OK, modify the pg_type row */ + if (oldNspOid != nspOid) + { + /* OK, modify the pg_type row */ - /* tup is a copy, so we can scribble directly on it */ - typform->typnamespace = nspOid; + /* tup is a copy, so we can scribble directly on it */ + typform->typnamespace = nspOid; - simple_heap_update(rel, &tup->t_self, tup); - CatalogUpdateIndexes(rel, tup); + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + } /* * Composite types have pg_class entries. @@ -2941,7 +3560,7 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, AlterRelationNamespaceInternal(classRel, typform->typrelid, oldNspOid, nspOid, - false); + false, objsMoved); heap_close(classRel, RowExclusiveLock); @@ -2950,33 +3569,39 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid, * currently support this, but probably will someday). */ AlterConstraintNamespaces(typform->typrelid, oldNspOid, - nspOid, false); + nspOid, false, objsMoved); } else { /* If it's a domain, it might have constraints */ if (typform->typtype == TYPTYPE_DOMAIN) - AlterConstraintNamespaces(typeOid, oldNspOid, nspOid, true); + AlterConstraintNamespaces(typeOid, oldNspOid, nspOid, true, + objsMoved); } /* * Update dependency on schema, if any --- a table rowtype has not got * one, and neither does an implicit array. */ - if ((isCompositeType || typform->typtype != TYPTYPE_COMPOSITE) && + if (oldNspOid != nspOid && + (isCompositeType || typform->typtype != TYPTYPE_COMPOSITE) && !isImplicitArray) if (changeDependencyFor(TypeRelationId, typeOid, NamespaceRelationId, oldNspOid, nspOid) != 1) elog(ERROR, "failed to change schema dependency for type %s", format_type_be(typeOid)); + InvokeObjectPostAlterHook(TypeRelationId, typeOid, 0); + heap_freetuple(tup); heap_close(rel, RowExclusiveLock); + add_exact_object_address(&thisobj, objsMoved); + /* Recursively alter the associated array type, if any */ if (OidIsValid(arrayOid)) - AlterTypeNamespaceInternal(arrayOid, nspOid, true, true); + AlterTypeNamespaceInternal(arrayOid, nspOid, true, true, objsMoved); return oldNspOid; }