]> granicus.if.org Git - postgresql/commitdiff
Improve handling of domains over arrays.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 21 Oct 2010 20:07:17 +0000 (16:07 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 21 Oct 2010 20:07:17 +0000 (16:07 -0400)
This patch eliminates various bizarre behaviors caused by sloppy thinking
about the difference between a domain type and its underlying array type.
In particular, the operation of updating one element of such an array
has to be considered as yielding a value of the underlying array type,
*not* a value of the domain, because there's no assurance that the
domain's CHECK constraints are still satisfied.  If we're intending to
store the result back into a domain column, we have to re-cast to the
domain type so that constraints are re-checked.

For similar reasons, such a domain can't be blindly matched to an ANYARRAY
polymorphic parameter, because the polymorphic function is likely to apply
array-ish operations that could invalidate the domain constraints.  For the
moment, we just forbid such matching.  We might later wish to insert an
automatic downcast to the underlying array type, but such a change should
also change matching of domains to ANYELEMENT for consistency.

To ensure that all such logic is rechecked, this patch removes the original
hack of setting a domain's pg_type.typelem field to match its base type;
the typelem will always be zero instead.  In those places where it's really
okay to look through the domain type with no other logic changes, use the
newly added get_base_element_type function in place of get_element_type.
catversion bumped due to change in pg_type contents.

Per bug #5717 from Richard Huxton and subsequent discussion.

21 files changed:
doc/src/sgml/catalogs.sgml
src/backend/commands/functioncmds.c
src/backend/commands/typecmds.c
src/backend/parser/parse_coerce.c
src/backend/parser/parse_expr.c
src/backend/parser/parse_node.c
src/backend/parser/parse_oper.c
src/backend/parser/parse_target.c
src/backend/utils/adt/format_type.c
src/backend/utils/adt/ruleutils.c
src/backend/utils/adt/selfuncs.c
src/backend/utils/adt/xml.c
src/backend/utils/cache/lsyscache.c
src/backend/utils/fmgr/fmgr.c
src/include/catalog/catversion.h
src/include/catalog/pg_type.h
src/include/parser/parse_node.h
src/include/utils/lsyscache.h
src/pl/plpgsql/src/pl_exec.c
src/test/regress/expected/domain.out
src/test/regress/sql/domain.sql

index bf695ed78f84b8ceeb5544f033b8bd4671e3e1e6..b7b48e4fb93c26c421cea5c6fd01a914f9ccc8b5 100644 (file)
       <entry></entry>
       <entry><para>
        <structfield>typndims</structfield> is the number of array dimensions
-       for a domain that is an array (that is, <structfield>typbasetype</> is
-       an array type; the domain's <structfield>typelem</> will match the base
-       type's <structfield>typelem</structfield>).
+       for a domain over an array (that is, <structfield>typbasetype</> is
+       an array type).
        Zero for types other than domains over array types.
        </para></entry>
      </row>
index bd977d2b3cbde6cdcbfe45b72d64d560e9982fba..e10d4fb01512933aea6f924dee58c7fad70e2711 100644 (file)
@@ -1657,6 +1657,23 @@ CreateCast(CreateCastStmt *stmt)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
                                         errmsg("array data types are not binary-compatible")));
+
+               /*
+                * We also disallow creating binary-compatibility casts involving
+                * domains.  Casting from a domain to its base type is already
+                * allowed, and casting the other way ought to go through domain
+                * coercion to permit constraint checking.  Again, if you're intent on
+                * having your own semantics for that, create a no-op cast function.
+                *
+                * NOTE: if we were to relax this, the above checks for composites
+                * etc. would have to be modified to look through domains to their
+                * base types.
+                */
+               if (sourcetyptype == TYPTYPE_DOMAIN ||
+                       targettyptype == TYPTYPE_DOMAIN)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                                        errmsg("domain data types must not be marked binary-compatible")));
        }
 
        /*
index 25503bda4f48661575ea5782c088b41dd92b8831..46b156e09a30ff264e6cf2e7567a2beb8cda43cb 100644 (file)
@@ -525,14 +525,12 @@ DefineType(List *names, List *parameters)
 
        /*
         * now have TypeCreate do all the real work.
+        *
+        * Note: the pg_type.oid is stored in user tables as array elements (base
+        * types) in ArrayType and in composite types in DatumTupleFields.  This
+        * oid must be preserved by binary upgrades.
         */
        typoid =
-
-       /*
-        * The pg_type.oid is stored in user tables as array elements (base types)
-        * in ArrayType and in composite types in DatumTupleFields.  This oid must
-        * be preserved by binary upgrades.
-        */
                TypeCreate(InvalidOid,  /* no predetermined type OID */
                                   typeName,    /* type name */
                                   typeNamespace,               /* namespace */
@@ -746,7 +744,6 @@ DefineDomain(CreateDomainStmt *stmt)
        Oid                     sendProcedure;
        Oid                     analyzeProcedure;
        bool            byValue;
-       Oid                     typelem;
        char            category;
        char            delimiter;
        char            alignment;
@@ -831,9 +828,6 @@ DefineDomain(CreateDomainStmt *stmt)
        /* Type Category */
        category = baseType->typcategory;
 
-       /* Array element type (in case base type is an array) */
-       typelem = baseType->typelem;
-
        /* Array element Delimiter */
        delimiter = baseType->typdelim;
 
@@ -1033,7 +1027,7 @@ DefineDomain(CreateDomainStmt *stmt)
                                   InvalidOid,  /* typmodin procedure - none */
                                   InvalidOid,  /* typmodout procedure - none */
                                   analyzeProcedure,    /* analyze procedure */
-                                  typelem,             /* element type ID */
+                                  InvalidOid,  /* no array element type */
                                   false,               /* this isn't an array */
                                   InvalidOid,  /* no arrays for domains (yet) */
                                   basetypeoid, /* base type ID */
@@ -1670,7 +1664,7 @@ AlterDomainDefault(List *names, Node *defaultRaw)
                                                         typTup->typmodin,
                                                         typTup->typmodout,
                                                         typTup->typanalyze,
-                                                        typTup->typelem,
+                                                        InvalidOid,
                                                         false,         /* a domain isn't an implicit array */
                                                         typTup->typbasetype,
                                                         defaultExpr,
index eddbd3197762416a4b9433bac0ea3c3d42798e5f..4eb48ff8b1dc7d32af9b1296a8b310dda5a99ccb 100644 (file)
@@ -1908,6 +1908,9 @@ find_coercion_pathway(Oid targetTypeId, Oid sourceTypeId,
                 * array types.  If so, and if the element types have a suitable cast,
                 * report that we can coerce with an ArrayCoerceExpr.
                 *
+                * Note that the source type can be a domain over array, but not the
+                * target, because ArrayCoerceExpr won't check domain constraints.
+                *
                 * Hack: disallow coercions to oidvector and int2vector, which
                 * otherwise tend to capture coercions that should go to "real" array
                 * types.  We want those types to be considered "real" arrays for many
@@ -1921,7 +1924,7 @@ find_coercion_pathway(Oid targetTypeId, Oid sourceTypeId,
                        Oid                     sourceElem;
 
                        if ((targetElem = get_element_type(targetTypeId)) != InvalidOid &&
-                               (sourceElem = get_element_type(sourceTypeId)) != InvalidOid)
+                               (sourceElem = get_base_element_type(sourceTypeId)) != InvalidOid)
                        {
                                CoercionPathType elempathtype;
                                Oid                     elemfuncid;
@@ -2001,10 +2004,8 @@ find_typmod_coercion_function(Oid typeId,
        targetType = typeidType(typeId);
        typeForm = (Form_pg_type) GETSTRUCT(targetType);
 
-       /* Check for a varlena array type (and not a domain) */
-       if (typeForm->typelem != InvalidOid &&
-               typeForm->typlen == -1 &&
-               typeForm->typtype != TYPTYPE_DOMAIN)
+       /* Check for a varlena array type */
+       if (typeForm->typelem != InvalidOid && typeForm->typlen == -1)
        {
                /* Yes, switch our attention to the element type */
                typeId = typeForm->typelem;
index addd0d4fffe769f21c6f03b540819c6987c61a49..4b5cf3e3db648e6cd00daba73cc78ff328b5b704 100644 (file)
@@ -161,19 +161,17 @@ transformExpr(ParseState *pstate, Node *expr)
 
                                        targetType = typenameTypeId(pstate, tc->typeName,
                                                                                                &targetTypmod);
+                                       /*
+                                        * If target is a domain over array, work with the base
+                                        * array type here.  transformTypeCast below will cast the
+                                        * array type to the domain.  In the usual case that the
+                                        * target is not a domain, transformTypeCast is a no-op.
+                                        */
+                                       targetType = getBaseTypeAndTypmod(targetType,
+                                                                                                         &targetTypmod);
                                        elementType = get_element_type(targetType);
                                        if (OidIsValid(elementType))
                                        {
-                                               /*
-                                                * tranformArrayExpr doesn't know how to check domain
-                                                * constraints, so ask it to return the base type
-                                                * instead. transformTypeCast below will cast it to
-                                                * the domain. In the usual case that the target is
-                                                * not a domain, transformTypeCast is a no-op.
-                                                */
-                                               targetType = getBaseTypeAndTypmod(targetType,
-                                                                                                                 &targetTypmod);
-
                                                tc = copyObject(tc);
                                                tc->arg = transformArrayExpr(pstate,
                                                                                                         (A_ArrayExpr *) tc->arg,
index 8f7b8dc8fb99bb77d96ad1bbc11f61fa2445049c..0f0a188eec4ac5b591f3ffe8573c82cc7930223d 100644 (file)
@@ -25,6 +25,7 @@
 #include "parser/parse_relation.h"
 #include "utils/builtins.h"
 #include "utils/int8.h"
+#include "utils/lsyscache.h"
 #include "utils/syscache.h"
 #include "utils/varbit.h"
 
@@ -198,19 +199,35 @@ make_var(ParseState *pstate, RangeTblEntry *rte, int attrno, int location)
 
 /*
  * transformArrayType()
- *             Get the element type of an array type in preparation for subscripting
+ *             Identify the types involved in a subscripting operation
+ *
+ * On entry, arrayType/arrayTypmod identify the type of the input value
+ * to be subscripted (which could be a domain type).  These are modified
+ * if necessary to identify the actual array type and typmod, and the
+ * array's element type is returned.  An error is thrown if the input isn't
+ * an array type.
  */
 Oid
-transformArrayType(Oid arrayType)
+transformArrayType(Oid *arrayType, int32 *arrayTypmod)
 {
+       Oid                     origArrayType = *arrayType;
        Oid                     elementType;
        HeapTuple       type_tuple_array;
        Form_pg_type type_struct_array;
 
+       /*
+        * If the input is a domain, smash to base type, and extract the actual
+        * typmod to be applied to the base type.  Subscripting a domain is an
+        * operation that necessarily works on the base array type, not the domain
+        * itself.  (Note that we provide no method whereby the creator of a
+        * domain over an array type could hide its ability to be subscripted.)
+        */
+       *arrayType = getBaseTypeAndTypmod(*arrayType, arrayTypmod);
+
        /* Get the type tuple for the array */
-       type_tuple_array = SearchSysCache1(TYPEOID, ObjectIdGetDatum(arrayType));
+       type_tuple_array = SearchSysCache1(TYPEOID, ObjectIdGetDatum(*arrayType));
        if (!HeapTupleIsValid(type_tuple_array))
-               elog(ERROR, "cache lookup failed for type %u", arrayType);
+               elog(ERROR, "cache lookup failed for type %u", *arrayType);
        type_struct_array = (Form_pg_type) GETSTRUCT(type_tuple_array);
 
        /* needn't check typisdefined since this will fail anyway */
@@ -220,7 +237,7 @@ transformArrayType(Oid arrayType)
                ereport(ERROR,
                                (errcode(ERRCODE_DATATYPE_MISMATCH),
                                 errmsg("cannot subscript type %s because it is not an array",
-                                               format_type_be(arrayType))));
+                                               format_type_be(origArrayType))));
 
        ReleaseSysCache(type_tuple_array);
 
@@ -241,13 +258,17 @@ transformArrayType(Oid arrayType)
  * that array. We produce an expression that represents the new array value
  * with the source data inserted into the right part of the array.
  *
+ * For both cases, if the source array is of a domain-over-array type,
+ * the result is of the base array type or its element type; essentially,
+ * we must fold a domain to its base type before applying subscripting.
+ *
  * pstate              Parse state
  * arrayBase   Already-transformed expression for the array as a whole
- * arrayType   OID of array's datatype (should match type of arrayBase)
+ * arrayType   OID of array's datatype (should match type of arrayBase,
+ *                             or be the base type of arrayBase's domain type)
  * elementType OID of array's element type (fetch with transformArrayType,
  *                             or pass InvalidOid to do it here)
- * elementTypMod typmod to be applied to array elements (if storing) or of
- *                             the source array (if fetching)
+ * arrayTypMod typmod for the array (which is also typmod for the elements)
  * indirection Untransformed list of subscripts (must not be NIL)
  * assignFrom  NULL for array fetch, else transformed expression for source.
  */
@@ -256,7 +277,7 @@ transformArraySubscripts(ParseState *pstate,
                                                 Node *arrayBase,
                                                 Oid arrayType,
                                                 Oid elementType,
-                                                int32 elementTypMod,
+                                                int32 arrayTypMod,
                                                 List *indirection,
                                                 Node *assignFrom)
 {
@@ -266,9 +287,13 @@ transformArraySubscripts(ParseState *pstate,
        ListCell   *idx;
        ArrayRef   *aref;
 
-       /* Caller may or may not have bothered to determine elementType */
+       /*
+        * Caller may or may not have bothered to determine elementType.  Note
+        * that if the caller did do so, arrayType/arrayTypMod must be as
+        * modified by transformArrayType, ie, smash domain to base type.
+        */
        if (!OidIsValid(elementType))
-               elementType = transformArrayType(arrayType);
+               elementType = transformArrayType(&arrayType, &arrayTypMod);
 
        /*
         * A list containing only single subscripts refers to a single array
@@ -356,7 +381,7 @@ transformArraySubscripts(ParseState *pstate,
 
                newFrom = coerce_to_target_type(pstate,
                                                                                assignFrom, typesource,
-                                                                               typeneeded, elementTypMod,
+                                                                               typeneeded, arrayTypMod,
                                                                                COERCION_ASSIGNMENT,
                                                                                COERCE_IMPLICIT_CAST,
                                                                                -1);
@@ -378,7 +403,7 @@ transformArraySubscripts(ParseState *pstate,
        aref = makeNode(ArrayRef);
        aref->refarraytype = arrayType;
        aref->refelemtype = elementType;
-       aref->reftypmod = elementTypMod;
+       aref->reftypmod = arrayTypMod;
        aref->refupperindexpr = upperIndexpr;
        aref->reflowerindexpr = lowerIndexpr;
        aref->refexpr = (Expr *) arrayBase;
index 1f9742b3adad232e68f30f8e72f11a553137cd48..8d77590fac982ba410bf3c2bf85afbd7b1ad7cbe 100644 (file)
@@ -209,7 +209,7 @@ get_sort_group_operators(Oid argtype,
                eq_opr == ARRAY_EQ_OP ||
                gt_opr == ARRAY_GT_OP)
        {
-               Oid                     elem_type = get_element_type(argtype);
+               Oid                     elem_type = get_base_element_type(argtype);
 
                if (OidIsValid(elem_type))
                {
@@ -906,7 +906,7 @@ make_scalar_array_op(ParseState *pstate, List *opname,
                rtypeId = UNKNOWNOID;
        else
        {
-               rtypeId = get_element_type(atypeId);
+               rtypeId = get_base_element_type(atypeId);
                if (!OidIsValid(rtypeId))
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
index e93c0afe915b08b1ce68482ebee7c8d5ecac7749..c777484d45d7d80550865ace7741c5415b06cab6 100644 (file)
@@ -43,6 +43,16 @@ static Node *transformAssignmentIndirection(ParseState *pstate,
                                                           ListCell *indirection,
                                                           Node *rhs,
                                                           int location);
+static Node *transformAssignmentSubscripts(ParseState *pstate,
+                                                         Node *basenode,
+                                                         const char *targetName,
+                                                         Oid targetTypeId,
+                                                         int32 targetTypMod,
+                                                         List *subscripts,
+                                                         bool isSlice,
+                                                         ListCell *next_indirection,
+                                                         Node *rhs,
+                                                         int location);
 static List *ExpandColumnRefStar(ParseState *pstate, ColumnRef *cref,
                                        bool targetlist);
 static List *ExpandAllTables(ParseState *pstate, int location);
@@ -613,27 +623,17 @@ transformAssignmentIndirection(ParseState *pstate,
                        /* process subscripts before this field selection */
                        if (subscripts)
                        {
-                               Oid                     elementTypeId = transformArrayType(targetTypeId);
-                               Oid                     typeNeeded = isSlice ? targetTypeId : elementTypeId;
-
-                               /* recurse to create appropriate RHS for array assign */
-                               rhs = transformAssignmentIndirection(pstate,
-                                                                                                        NULL,
+                               /* recurse, and then return because we're done */
+                               return transformAssignmentSubscripts(pstate,
+                                                                                                        basenode,
                                                                                                         targetName,
-                                                                                                        true,
-                                                                                                        typeNeeded,
+                                                                                                        targetTypeId,
                                                                                                         targetTypMod,
+                                                                                                        subscripts,
+                                                                                                        isSlice,
                                                                                                         i,
                                                                                                         rhs,
                                                                                                         location);
-                               /* process subscripts */
-                               return (Node *) transformArraySubscripts(pstate,
-                                                                                                                basenode,
-                                                                                                                targetTypeId,
-                                                                                                                elementTypeId,
-                                                                                                                targetTypMod,
-                                                                                                                subscripts,
-                                                                                                                rhs);
                        }
 
                        /* No subscripts, so can process field selection here */
@@ -690,27 +690,17 @@ transformAssignmentIndirection(ParseState *pstate,
        /* process trailing subscripts, if any */
        if (subscripts)
        {
-               Oid                     elementTypeId = transformArrayType(targetTypeId);
-               Oid                     typeNeeded = isSlice ? targetTypeId : elementTypeId;
-
-               /* recurse to create appropriate RHS for array assign */
-               rhs = transformAssignmentIndirection(pstate,
-                                                                                        NULL,
+               /* recurse, and then return because we're done */
+               return transformAssignmentSubscripts(pstate,
+                                                                                        basenode,
                                                                                         targetName,
-                                                                                        true,
-                                                                                        typeNeeded,
+                                                                                        targetTypeId,
                                                                                         targetTypMod,
+                                                                                        subscripts,
+                                                                                        isSlice,
                                                                                         NULL,
                                                                                         rhs,
                                                                                         location);
-               /* process subscripts */
-               return (Node *) transformArraySubscripts(pstate,
-                                                                                                basenode,
-                                                                                                targetTypeId,
-                                                                                                elementTypeId,
-                                                                                                targetTypMod,
-                                                                                                subscripts,
-                                                                                                rhs);
        }
 
        /* base case: just coerce RHS to match target type ID */
@@ -748,6 +738,79 @@ transformAssignmentIndirection(ParseState *pstate,
        return result;
 }
 
+/*
+ * helper for transformAssignmentIndirection: process array assignment
+ */
+static Node *
+transformAssignmentSubscripts(ParseState *pstate,
+                                                         Node *basenode,
+                                                         const char *targetName,
+                                                         Oid targetTypeId,
+                                                         int32 targetTypMod,
+                                                         List *subscripts,
+                                                         bool isSlice,
+                                                         ListCell *next_indirection,
+                                                         Node *rhs,
+                                                         int location)
+{
+       Node       *result;
+       Oid                     arrayType;
+       int32           arrayTypMod;
+       Oid                     elementTypeId;
+       Oid                     typeNeeded;
+
+       Assert(subscripts != NIL);
+
+       /* Identify the actual array type and element type involved */
+       arrayType = targetTypeId;
+       arrayTypMod = targetTypMod;
+       elementTypeId = transformArrayType(&arrayType, &arrayTypMod);
+
+       /* Identify type that RHS must provide */
+       typeNeeded = isSlice ? arrayType : elementTypeId;
+
+       /* recurse to create appropriate RHS for array assign */
+       rhs = transformAssignmentIndirection(pstate,
+                                                                                NULL,
+                                                                                targetName,
+                                                                                true,
+                                                                                typeNeeded,
+                                                                                arrayTypMod,
+                                                                                next_indirection,
+                                                                                rhs,
+                                                                                location);
+
+       /* process subscripts */
+       result = (Node *) transformArraySubscripts(pstate,
+                                                                                          basenode,
+                                                                                          arrayType,
+                                                                                          elementTypeId,
+                                                                                          arrayTypMod,
+                                                                                          subscripts,
+                                                                                          rhs);
+
+       /* If target was a domain over array, need to coerce up to the domain */
+       if (arrayType != targetTypeId)
+       {
+               result = coerce_to_target_type(pstate,
+                                                                          result, exprType(result),
+                                                                          targetTypeId, targetTypMod,
+                                                                          COERCION_ASSIGNMENT,
+                                                                          COERCE_IMPLICIT_CAST,
+                                                                          -1);
+               /* probably shouldn't fail, but check */
+               if (result == NULL)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_CANNOT_COERCE),
+                                        errmsg("cannot cast type %s to %s",
+                                                       format_type_be(exprType(result)),
+                                                       format_type_be(targetTypeId)),
+                                        parser_errposition(pstate, location)));
+       }
+
+       return result;
+}
+
 
 /*
  * checkInsertTargets -
index 8fd551ef841cb90cb4075529fcb236f91d5da660..f6f5efe1263922b89658392a13f97c14c64539d0 100644 (file)
@@ -134,18 +134,16 @@ format_type_internal(Oid type_oid, int32 typemod,
        typeform = (Form_pg_type) GETSTRUCT(tuple);
 
        /*
-        * Check if it's an array (and not a domain --- we don't want to show the
-        * substructure of a domain type).      Fixed-length array types such as
-        * "name" shouldn't get deconstructed either.  As of Postgres 8.1, rather
-        * than checking typlen we check the toast property, and don't deconstruct
-        * "plain storage" array types --- this is because we don't want to show
-        * oidvector as oid[].
+        * Check if it's a regular (variable length) array type.  Fixed-length
+        * array types such as "name" shouldn't get deconstructed.  As of Postgres
+        * 8.1, rather than checking typlen we check the toast property, and don't
+        * deconstruct "plain storage" array types --- this is because we don't
+        * want to show oidvector as oid[].
         */
        array_base_type = typeform->typelem;
 
        if (array_base_type != InvalidOid &&
-               typeform->typstorage != 'p' &&
-               typeform->typtype != TYPTYPE_DOMAIN)
+               typeform->typstorage != 'p')
        {
                /* Switch our attention to the array element type */
                ReleaseSysCache(tuple);
index 22ba948e7328ee3849ac89bc63d2148cbdd2f560..d4279c0f4e546fcf50435fea7c7e713f45068d50 100644 (file)
@@ -4850,7 +4850,7 @@ get_rule_expr(Node *node, deparse_context *context,
                                appendStringInfo(buf, " %s %s (",
                                                                 generate_operator_name(expr->opno,
                                                                                                                exprType(arg1),
-                                                                                  get_element_type(exprType(arg2))),
+                                                                          get_base_element_type(exprType(arg2))),
                                                                 expr->useOr ? "ANY" : "ALL");
                                get_rule_expr_paren(arg2, context, true, node);
                                appendStringInfoChar(buf, ')');
index ce6d4e2a79b5cb2048e7f7932649ab721af9d029..c7442218a84a12972658ef320c20c7aa783b80a7 100644 (file)
@@ -1704,7 +1704,7 @@ scalararraysel(PlannerInfo *root,
        rightop = (Node *) lsecond(clause->args);
 
        /* get nominal (after relabeling) element type of rightop */
-       nominal_element_type = get_element_type(exprType(rightop));
+       nominal_element_type = get_base_element_type(exprType(rightop));
        if (!OidIsValid(nominal_element_type))
                return (Selectivity) 0.5;               /* probably shouldn't happen */
 
index a94a457cac6f4e15471775880e8a21b2fc6f061d..6e9c7fe2b065781f7216ce28eebcb2b3751a58e9 100644 (file)
@@ -1615,7 +1615,7 @@ map_xml_name_to_sql_identifier(char *name)
 char *
 map_sql_value_to_xml_value(Datum value, Oid type, bool xml_escape_strings)
 {
-       if (type_is_array(type))
+       if (type_is_array_domain(type))
        {
                ArrayType  *array;
                Oid                     elmtype;
index 6fae6182932a32b578eac2ecf9ed973df404570a..740e8c4ab42eb4d1da7c4fab217ad3d893b91cf5 100644 (file)
@@ -2212,6 +2212,52 @@ get_array_type(Oid typid)
        return result;
 }
 
+/*
+ * get_base_element_type
+ *             Given the type OID, get the typelem, looking "through" any domain
+ *             to its underlying array type.
+ *
+ * This is equivalent to get_element_type(getBaseType(typid)), but avoids
+ * an extra cache lookup.  Note that it fails to provide any information
+ * about the typmod of the array.
+ */
+Oid
+get_base_element_type(Oid typid)
+{
+       /*
+        * We loop to find the bottom base type in a stack of domains.
+        */
+       for (;;)
+       {
+               HeapTuple       tup;
+               Form_pg_type typTup;
+
+               tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+               if (!HeapTupleIsValid(tup))
+                       break;
+               typTup = (Form_pg_type) GETSTRUCT(tup);
+               if (typTup->typtype != TYPTYPE_DOMAIN)
+               {
+                       /* Not a domain, so stop descending */
+                       Oid                     result;
+
+                       /* This test must match get_element_type */
+                       if (typTup->typlen == -1)
+                               result = typTup->typelem;
+                       else
+                               result = InvalidOid;
+                       ReleaseSysCache(tup);
+                       return result;
+               }
+
+               typid = typTup->typbasetype;
+               ReleaseSysCache(tup);
+       }
+
+       /* Like get_element_type, silently return InvalidOid for bogus input */
+       return InvalidOid;
+}
+
 /*
  * getTypeInputInfo
  *
index a4b7e4aa785abdf7957cef52f7d43e8b9e67fd31..1c9d2c2fa7b21c8f33e7c74b9a7dd7c36418583e 100644 (file)
@@ -2326,10 +2326,10 @@ get_call_expr_argtype(Node *expr, int argnum)
         */
        if (IsA(expr, ScalarArrayOpExpr) &&
                argnum == 1)
-               argtype = get_element_type(argtype);
+               argtype = get_base_element_type(argtype);
        else if (IsA(expr, ArrayCoerceExpr) &&
                         argnum == 0)
-               argtype = get_element_type(argtype);
+               argtype = get_base_element_type(argtype);
 
        return argtype;
 }
index 28fbffa3629af9d740f8b9b256c0f655dfb78366..e30a7d7298b489286c2e8cccea45a32de8ceb556 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201010151
+#define CATALOG_VERSION_NO     201010201
 
 #endif
index fc2c3066f0057872295d04e944dbd5148959a6c9..201e5dbc1f59c05128ff647084c9645d699edfe1 100644 (file)
@@ -189,8 +189,7 @@ CATALOG(pg_type,1247) BKI_BOOTSTRAP BKI_ROWTYPE_OID(71) BKI_SCHEMA_MACRO
 
        /*
         * typndims is the declared number of dimensions for an array domain type
-        * (i.e., typbasetype is an array type; the domain's typelem will match
-        * the base type's typelem).  Otherwise zero.
+        * (i.e., typbasetype is an array type).  Otherwise zero.
         */
        int4            typndims;
 
index ff8d11031d93e7daf08b68dfd8b9c7d516ca08eb..7312188667f4904c8031ef8743708da5323fb2be 100644 (file)
@@ -139,12 +139,12 @@ extern void cancel_parser_errposition_callback(ParseCallbackState *pcbstate);
 
 extern Var *make_var(ParseState *pstate, RangeTblEntry *rte, int attrno,
                 int location);
-extern Oid     transformArrayType(Oid arrayType);
+extern Oid     transformArrayType(Oid *arrayType, int32 *arrayTypmod);
 extern ArrayRef *transformArraySubscripts(ParseState *pstate,
                                                 Node *arrayBase,
                                                 Oid arrayType,
                                                 Oid elementType,
-                                                int32 elementTypMod,
+                                                int32 arrayTypMod,
                                                 List *indirection,
                                                 Node *assignFrom);
 extern Const *make_const(ParseState *pstate, Value *value, int location);
index 136bf386ca9a23588710fd0c52ac9002bfc3298e..02c0219fa0eb187041c843fd9b37c8119610c4e0 100644 (file)
@@ -117,6 +117,7 @@ extern void get_type_category_preferred(Oid typid,
 extern Oid     get_typ_typrelid(Oid typid);
 extern Oid     get_element_type(Oid typid);
 extern Oid     get_array_type(Oid typid);
+extern Oid     get_base_element_type(Oid typid);
 extern void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam);
 extern void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena);
 extern void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam);
@@ -138,6 +139,8 @@ extern void free_attstatsslot(Oid atttype,
 extern char *get_namespace_name(Oid nspid);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
+/* type_is_array_domain accepts both plain arrays and domains over arrays */
+#define type_is_array_domain(typid)  (get_base_element_type(typid) != InvalidOid)
 
 #define TypeIsToastable(typid) (get_typstorage(typid) != 'p')
 
index e3f857292579ec58c549abc620c8c901852c5ac9..9929e04e57bbc7d08938765b7f506c05c07b772b 100644 (file)
@@ -154,6 +154,7 @@ static void exec_assign_value(PLpgSQL_execstate *estate,
 static void exec_eval_datum(PLpgSQL_execstate *estate,
                                PLpgSQL_datum *datum,
                                Oid *typeid,
+                               int32 *typetypmod,
                                Datum *value,
                                bool *isnull);
 static int exec_eval_integer(PLpgSQL_execstate *estate,
@@ -3736,6 +3737,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                bool            oldarrayisnull;
                                Oid                     arraytypeid,
                                                        arrayelemtypeid;
+                               int32           arraytypmod;
                                int16           arraytyplen,
                                                        elemtyplen;
                                bool            elemtypbyval;
@@ -3780,8 +3782,13 @@ exec_assign_value(PLpgSQL_execstate *estate,
 
                                /* Fetch current value of array datum */
                                exec_eval_datum(estate, target,
-                                                         &arraytypeid, &oldarraydatum, &oldarrayisnull);
+                                                               &arraytypeid, &arraytypmod,
+                                                               &oldarraydatum, &oldarrayisnull);
 
+                               /* If target is domain over array, reduce to base type */
+                               arraytypeid = getBaseTypeAndTypmod(arraytypeid, &arraytypmod);
+
+                               /* ... and identify the element type */
                                arrayelemtypeid = get_element_type(arraytypeid);
                                if (!OidIsValid(arrayelemtypeid))
                                        ereport(ERROR,
@@ -3831,7 +3838,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                coerced_value = exec_simple_cast_value(value,
                                                                                                           valtype,
                                                                                                           arrayelemtypeid,
-                                                                                                          -1,
+                                                                                                          arraytypmod,
                                                                                                           *isNull);
 
                                /*
@@ -3875,7 +3882,9 @@ exec_assign_value(PLpgSQL_execstate *estate,
 
                                /*
                                 * Assign the new array to the base variable.  It's never NULL
-                                * at this point.
+                                * at this point.  Note that if the target is a domain,
+                                * coercing the base array type back up to the domain will
+                                * happen within exec_assign_value.
                                 */
                                *isNull = false;
                                exec_assign_value(estate, target,
@@ -3897,7 +3906,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
 /*
  * exec_eval_datum                             Get current value of a PLpgSQL_datum
  *
- * The type oid, value in Datum format, and null flag are returned.
+ * The type oid, typmod, value in Datum format, and null flag are returned.
  *
  * At present this doesn't handle PLpgSQL_expr or PLpgSQL_arrayelem datums.
  *
@@ -3910,6 +3919,7 @@ static void
 exec_eval_datum(PLpgSQL_execstate *estate,
                                PLpgSQL_datum *datum,
                                Oid *typeid,
+                               int32 *typetypmod,
                                Datum *value,
                                bool *isnull)
 {
@@ -3922,6 +3932,7 @@ exec_eval_datum(PLpgSQL_execstate *estate,
                                PLpgSQL_var *var = (PLpgSQL_var *) datum;
 
                                *typeid = var->datatype->typoid;
+                               *typetypmod = var->datatype->atttypmod;
                                *value = var->value;
                                *isnull = var->isnull;
                                break;
@@ -3942,6 +3953,7 @@ exec_eval_datum(PLpgSQL_execstate *estate,
                                        elog(ERROR, "row not compatible with its own tupdesc");
                                MemoryContextSwitchTo(oldcontext);
                                *typeid = row->rowtupdesc->tdtypeid;
+                               *typetypmod = row->rowtupdesc->tdtypmod;
                                *value = HeapTupleGetDatum(tup);
                                *isnull = false;
                                break;
@@ -3974,6 +3986,7 @@ exec_eval_datum(PLpgSQL_execstate *estate,
                                HeapTupleHeaderSetTypMod(worktup.t_data, rec->tupdesc->tdtypmod);
                                MemoryContextSwitchTo(oldcontext);
                                *typeid = rec->tupdesc->tdtypeid;
+                               *typetypmod = rec->tupdesc->tdtypmod;
                                *value = HeapTupleGetDatum(&worktup);
                                *isnull = false;
                                break;
@@ -3999,6 +4012,11 @@ exec_eval_datum(PLpgSQL_execstate *estate,
                                                         errmsg("record \"%s\" has no field \"%s\"",
                                                                        rec->refname, recfield->fieldname)));
                                *typeid = SPI_gettypeid(rec->tupdesc, fno);
+                               /* XXX there's no SPI_gettypmod, for some reason */
+                               if (fno > 0)
+                                       *typetypmod = rec->tupdesc->attrs[fno - 1]->atttypmod;
+                               else
+                                       *typetypmod = -1;
                                *value = SPI_getbinval(rec->tup, rec->tupdesc, fno, isnull);
                                break;
                        }
@@ -4671,6 +4689,7 @@ plpgsql_param_fetch(ParamListInfo params, int paramid)
        PLpgSQL_expr *expr;
        PLpgSQL_datum *datum;
        ParamExternData *prm;
+       int32           prmtypmod;
 
        /* paramid's are 1-based, but dnos are 0-based */
        dno = paramid - 1;
@@ -4693,7 +4712,8 @@ plpgsql_param_fetch(ParamListInfo params, int paramid)
        datum = estate->datums[dno];
        prm = &params->params[dno];
        exec_eval_datum(estate, datum,
-                                       &prm->ptype, &prm->value, &prm->isnull);
+                                       &prm->ptype, &prmtypmod,
+                                       &prm->value, &prm->isnull);
 }
 
 
@@ -4870,6 +4890,7 @@ make_tuple_from_row(PLpgSQL_execstate *estate,
        for (i = 0; i < natts; i++)
        {
                Oid                     fieldtypeid;
+               int32           fieldtypmod;
 
                if (tupdesc->attrs[i]->attisdropped)
                {
@@ -4880,9 +4901,11 @@ make_tuple_from_row(PLpgSQL_execstate *estate,
                        elog(ERROR, "dropped rowtype entry for non-dropped column");
 
                exec_eval_datum(estate, estate->datums[row->varnos[i]],
-                                               &fieldtypeid, &dvalues[i], &nulls[i]);
+                                               &fieldtypeid, &fieldtypmod,
+                                               &dvalues[i], &nulls[i]);
                if (fieldtypeid != tupdesc->attrs[i]->atttypid)
                        return NULL;
+               /* XXX should we insist on typmod match, too? */
        }
 
        tuple = heap_form_tuple(tupdesc, dvalues, nulls);
index c746b946b9d3a33a267be76a4b5ce551388881a9..7d72791e5ef1bb126032e100d0faf209396bfd74 100644 (file)
@@ -496,3 +496,102 @@ drop table ddtest2;
 drop type ddtest1;
 drop domain posint cascade;
 NOTICE:  drop cascades to type posint2
+--
+-- Check enforcement of domain-related typmod in plpgsql (bug #5717)
+--
+create or replace function array_elem_check(numeric) returns numeric as $$
+declare
+  x numeric(4,2)[1];
+begin
+  x[1] := $1;
+  return x[1];
+end$$ language plpgsql;
+select array_elem_check(121.00);
+ERROR:  numeric field overflow
+DETAIL:  A field with precision 4, scale 2 must round to an absolute value less than 10^2.
+CONTEXT:  PL/pgSQL function "array_elem_check" line 5 at assignment
+select array_elem_check(1.23456);
+ array_elem_check 
+------------------
+             1.23
+(1 row)
+
+create domain mynums as numeric(4,2)[1];
+create or replace function array_elem_check(numeric) returns numeric as $$
+declare
+  x mynums;
+begin
+  x[1] := $1;
+  return x[1];
+end$$ language plpgsql;
+select array_elem_check(121.00);
+ERROR:  numeric field overflow
+DETAIL:  A field with precision 4, scale 2 must round to an absolute value less than 10^2.
+CONTEXT:  PL/pgSQL function "array_elem_check" line 5 at assignment
+select array_elem_check(1.23456);
+ array_elem_check 
+------------------
+             1.23
+(1 row)
+
+create domain mynums2 as mynums;
+create or replace function array_elem_check(numeric) returns numeric as $$
+declare
+  x mynums2;
+begin
+  x[1] := $1;
+  return x[1];
+end$$ language plpgsql;
+select array_elem_check(121.00);
+ERROR:  numeric field overflow
+DETAIL:  A field with precision 4, scale 2 must round to an absolute value less than 10^2.
+CONTEXT:  PL/pgSQL function "array_elem_check" line 5 at assignment
+select array_elem_check(1.23456);
+ array_elem_check 
+------------------
+             1.23
+(1 row)
+
+drop function array_elem_check(numeric);
+--
+-- Check enforcement of array-level domain constraints
+--
+create domain orderedpair as int[2] check (value[1] < value[2]);
+select array[1,2]::orderedpair;
+ array 
+-------
+ {1,2}
+(1 row)
+
+select array[2,1]::orderedpair;  -- fail
+ERROR:  value for domain orderedpair violates check constraint "orderedpair_check"
+create temp table op (f1 orderedpair);
+insert into op values (array[1,2]);
+insert into op values (array[2,1]);  -- fail
+ERROR:  value for domain orderedpair violates check constraint "orderedpair_check"
+update op set f1[2] = 3;
+update op set f1[2] = 0;  -- fail
+ERROR:  value for domain orderedpair violates check constraint "orderedpair_check"
+select * from op;
+  f1   
+-------
+ {1,3}
+(1 row)
+
+create or replace function array_elem_check(int) returns int as $$
+declare
+  x orderedpair := '{1,2}';
+begin
+  x[2] := $1;
+  return x[2];
+end$$ language plpgsql;
+select array_elem_check(3);
+ array_elem_check 
+------------------
+                3
+(1 row)
+
+select array_elem_check(-1);
+ERROR:  value for domain orderedpair violates check constraint "orderedpair_check"
+CONTEXT:  PL/pgSQL function "array_elem_check" line 5 at assignment
+drop function array_elem_check(int);
index 1e5295899b22347da3217a389378255fe0438cef..545af6262201772c17a3eb06104d76afe662668b 100644 (file)
@@ -393,3 +393,76 @@ alter domain posint add constraint c2 check(value > 0); -- OK
 drop table ddtest2;
 drop type ddtest1;
 drop domain posint cascade;
+
+--
+-- Check enforcement of domain-related typmod in plpgsql (bug #5717)
+--
+
+create or replace function array_elem_check(numeric) returns numeric as $$
+declare
+  x numeric(4,2)[1];
+begin
+  x[1] := $1;
+  return x[1];
+end$$ language plpgsql;
+
+select array_elem_check(121.00);
+select array_elem_check(1.23456);
+
+create domain mynums as numeric(4,2)[1];
+
+create or replace function array_elem_check(numeric) returns numeric as $$
+declare
+  x mynums;
+begin
+  x[1] := $1;
+  return x[1];
+end$$ language plpgsql;
+
+select array_elem_check(121.00);
+select array_elem_check(1.23456);
+
+create domain mynums2 as mynums;
+
+create or replace function array_elem_check(numeric) returns numeric as $$
+declare
+  x mynums2;
+begin
+  x[1] := $1;
+  return x[1];
+end$$ language plpgsql;
+
+select array_elem_check(121.00);
+select array_elem_check(1.23456);
+
+drop function array_elem_check(numeric);
+
+--
+-- Check enforcement of array-level domain constraints
+--
+
+create domain orderedpair as int[2] check (value[1] < value[2]);
+
+select array[1,2]::orderedpair;
+select array[2,1]::orderedpair;  -- fail
+
+create temp table op (f1 orderedpair);
+insert into op values (array[1,2]);
+insert into op values (array[2,1]);  -- fail
+
+update op set f1[2] = 3;
+update op set f1[2] = 0;  -- fail
+select * from op;
+
+create or replace function array_elem_check(int) returns int as $$
+declare
+  x orderedpair := '{1,2}';
+begin
+  x[2] := $1;
+  return x[2];
+end$$ language plpgsql;
+
+select array_elem_check(3);
+select array_elem_check(-1);
+
+drop function array_elem_check(int);