]> granicus.if.org Git - postgresql/commitdiff
Fix multiple assignments to a column of a domain type.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 11 Jul 2017 20:48:59 +0000 (16:48 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 11 Jul 2017 20:48:59 +0000 (16:48 -0400)
We allow INSERT and UPDATE commands to assign to the same column more than
once, as long as the assignments are to subfields or elements rather than
the whole column.  However, this failed when the target column was a domain
over array rather than plain array.  Fix by teaching process_matched_tle()
to look through CoerceToDomain nodes, and add relevant test cases.

Also add a group of test cases exercising domains over array of composite.
It's doubtless accidental that CREATE DOMAIN allows this case while not
allowing straight domain over composite; but it does, so we'd better make
sure we don't break it.  (I could not find any documentation mentioning
either side of that, so no doc changes.)

It's been like this for a long time, so back-patch to all supported
branches.

Discussion: https://postgr.es/m/4206.1499798337@sss.pgh.pa.us

src/backend/rewrite/rewriteHandler.c
src/test/regress/expected/domain.out
src/test/regress/sql/domain.sql

index a22a11e2c19a917dfe32c1a2d93cf6fb931f792c..6e799f721dae811b5fa302a4c21efc49073df7c8 100644 (file)
@@ -878,6 +878,7 @@ process_matched_tle(TargetEntry *src_tle,
                                        const char *attrName)
 {
        TargetEntry *result;
+       CoerceToDomain *coerce_expr = NULL;
        Node       *src_expr;
        Node       *prior_expr;
        Node       *src_input;
@@ -914,10 +915,30 @@ process_matched_tle(TargetEntry *src_tle,
         * For FieldStore, instead of nesting we can generate a single
         * FieldStore with multiple target fields.  We must nest when
         * ArrayRefs are involved though.
+        *
+        * As a further complication, the destination column might be a domain,
+        * resulting in each assignment containing a CoerceToDomain node over a
+        * FieldStore or ArrayRef.  These should have matching target domains,
+        * so we strip them and reconstitute a single CoerceToDomain over the
+        * combined FieldStore/ArrayRef nodes.  (Notice that this has the result
+        * that the domain's checks are applied only after we do all the field or
+        * element updates, not after each one.  This is arguably desirable.)
         *----------
         */
        src_expr = (Node *) src_tle->expr;
        prior_expr = (Node *) prior_tle->expr;
+
+       if (src_expr && IsA(src_expr, CoerceToDomain) &&
+               prior_expr && IsA(prior_expr, CoerceToDomain) &&
+               ((CoerceToDomain *) src_expr)->resulttype ==
+               ((CoerceToDomain *) prior_expr)->resulttype)
+       {
+               /* we assume without checking that resulttypmod/resultcollid match */
+               coerce_expr = (CoerceToDomain *) src_expr;
+               src_expr = (Node *) ((CoerceToDomain *) src_expr)->arg;
+               prior_expr = (Node *) ((CoerceToDomain *) prior_expr)->arg;
+       }
+
        src_input = get_assignment_input(src_expr);
        prior_input = get_assignment_input(prior_expr);
        if (src_input == NULL ||
@@ -986,6 +1007,16 @@ process_matched_tle(TargetEntry *src_tle,
                newexpr = NULL;
        }
 
+       if (coerce_expr)
+       {
+               /* put back the CoerceToDomain */
+               CoerceToDomain *newcoerce = makeNode(CoerceToDomain);
+
+               memcpy(newcoerce, coerce_expr, sizeof(CoerceToDomain));
+               newcoerce->arg = (Expr *) newexpr;
+               newexpr = (Node *) newcoerce;
+       }
+
        result = flatCopyTargetEntry(src_tle);
        result->expr = (Expr *) newexpr;
        return result;
index 41b70e287bde14cfb68281ab81426b73b2062ecc..9fb750d2d36a53f50df668d7c5b6b1410ab24f00 100644 (file)
@@ -107,6 +107,7 @@ INSERT INTO domarrtest values ('{2,2}', '{{"a"},{"c"}}');
 INSERT INTO domarrtest values (NULL, '{{"a","b","c"},{"d","e","f"}}');
 INSERT INTO domarrtest values (NULL, '{{"toolong","b","c"},{"d","e","f"}}');
 ERROR:  value too long for type character varying(4)
+INSERT INTO domarrtest (testint4arr[1], testint4arr[3]) values (11,22);
 select * from domarrtest;
   testint4arr  |    testchar4arr     
 ---------------+---------------------
@@ -115,7 +116,8 @@ select * from domarrtest;
  {2,2}         | {{a,b},{c,d},{e,f}}
  {2,2}         | {{a},{c}}
                | {{a,b,c},{d,e,f}}
-(5 rows)
+ {11,NULL,22}  | 
+(6 rows)
 
 select testint4arr[1], testchar4arr[2:2] from domarrtest;
  testint4arr | testchar4arr 
@@ -125,7 +127,8 @@ select testint4arr[1], testchar4arr[2:2] from domarrtest;
            2 | {{c,d}}
            2 | {{c}}
              | {{d,e,f}}
-(5 rows)
+          11 | 
+(6 rows)
 
 select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest;
  array_dims | array_dims 
@@ -135,7 +138,8 @@ select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest;
  [1:2]      | [1:3][1:2]
  [1:2]      | [1:2][1:1]
             | [1:2][1:3]
-(5 rows)
+ [1:3]      | 
+(6 rows)
 
 COPY domarrtest FROM stdin;
 COPY domarrtest FROM stdin;    -- fail
@@ -149,9 +153,21 @@ select * from domarrtest;
  {2,2}         | {{a,b},{c,d},{e,f}}
  {2,2}         | {{a},{c}}
                | {{a,b,c},{d,e,f}}
+ {11,NULL,22}  | 
  {3,4}         | {q,w,e}
                | 
-(7 rows)
+(8 rows)
+
+update domarrtest set
+  testint4arr[1] = testint4arr[1] + 1,
+  testint4arr[3] = testint4arr[3] - 1
+where testchar4arr is null;
+select * from domarrtest where testchar4arr is null;
+   testint4arr    | testchar4arr 
+------------------+--------------
+ {12,NULL,21}     | 
+ {NULL,NULL,NULL} | 
+(2 rows)
 
 drop table domarrtest;
 drop domain domainint4arr restrict;
@@ -182,6 +198,92 @@ select pg_typeof('{1,2,3}'::dia || 42); -- should be int[] not dia
 (1 row)
 
 drop domain dia;
+-- Test domains over arrays of composite
+create type comptype as (r float8, i float8);
+create domain dcomptypea as comptype[];
+create table dcomptable (d1 dcomptypea unique);
+insert into dcomptable values (array[row(1,2)]::dcomptypea);
+insert into dcomptable values (array[row(3,4), row(5,6)]::comptype[]);
+insert into dcomptable values (array[row(7,8)::comptype, row(9,10)::comptype]);
+insert into dcomptable values (array[row(1,2)]::dcomptypea);  -- fail on uniqueness
+ERROR:  duplicate key value violates unique constraint "dcomptable_d1_key"
+DETAIL:  Key (d1)=({"(1,2)"}) already exists.
+insert into dcomptable (d1[1]) values(row(9,10));
+insert into dcomptable (d1[1].r) values(11);
+select * from dcomptable;
+         d1         
+--------------------
+ {"(1,2)"}
+ {"(3,4)","(5,6)"}
+ {"(7,8)","(9,10)"}
+ {"(9,10)"}
+ {"(11,)"}
+(5 rows)
+
+select d1[2], d1[1].r, d1[1].i from dcomptable;
+   d1   | r  | i  
+--------+----+----
+        |  1 |  2
+ (5,6)  |  3 |  4
+ (9,10) |  7 |  8
+        |  9 | 10
+        | 11 |   
+(5 rows)
+
+update dcomptable set d1[2] = row(d1[2].i, d1[2].r);
+select * from dcomptable;
+         d1         
+--------------------
+ {"(1,2)","(,)"}
+ {"(3,4)","(6,5)"}
+ {"(7,8)","(10,9)"}
+ {"(9,10)","(,)"}
+ {"(11,)","(,)"}
+(5 rows)
+
+update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0;
+select * from dcomptable;
+         d1         
+--------------------
+ {"(11,)","(,)"}
+ {"(2,2)","(,)"}
+ {"(4,4)","(6,5)"}
+ {"(8,8)","(10,9)"}
+ {"(10,10)","(,)"}
+(5 rows)
+
+alter domain dcomptypea add constraint c1 check (value[1].r <= value[1].i);
+alter domain dcomptypea add constraint c2 check (value[1].r > value[1].i);  -- fail
+ERROR:  column "d1" of table "dcomptable" contains values that violate the new constraint
+select array[row(2,1)]::dcomptypea;  -- fail
+ERROR:  value for domain dcomptypea violates check constraint "c1"
+insert into dcomptable values (array[row(1,2)]::comptype[]);
+insert into dcomptable values (array[row(2,1)]::comptype[]);  -- fail
+ERROR:  value for domain dcomptypea violates check constraint "c1"
+insert into dcomptable (d1[1].r) values(99);
+insert into dcomptable (d1[1].r, d1[1].i) values(99, 100);
+insert into dcomptable (d1[1].r, d1[1].i) values(100, 99);  -- fail
+ERROR:  value for domain dcomptypea violates check constraint "c1"
+update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0;  -- fail
+ERROR:  value for domain dcomptypea violates check constraint "c1"
+update dcomptable set d1[1].r = d1[1].r - 1 where d1[1].i > 0;
+select * from dcomptable;
+         d1         
+--------------------
+ {"(11,)","(,)"}
+ {"(99,)"}
+ {"(1,2)","(,)"}
+ {"(3,4)","(6,5)"}
+ {"(7,8)","(10,9)"}
+ {"(9,10)","(,)"}
+ {"(0,2)"}
+ {"(98,100)"}
+(8 rows)
+
+drop table dcomptable;
+drop type comptype cascade;
+NOTICE:  drop cascades to type dcomptypea
+-- Test not-null restrictions
 create domain dnotnull varchar(15) NOT NULL;
 create domain dnull    varchar(15);
 create domain dcheck   varchar(15) NOT NULL CHECK (VALUE = 'a' OR VALUE = 'c' OR VALUE = 'd');
index 407d3efc35ae70e081ebc6ec56be09b83ac381c7..1fd7b11fd439aea0e9f3de44dd1108b25878dd4d 100644 (file)
@@ -85,6 +85,7 @@ INSERT INTO domarrtest values ('{2,2}', '{{"a","b"},{"c","d"},{"e","f"}}');
 INSERT INTO domarrtest values ('{2,2}', '{{"a"},{"c"}}');
 INSERT INTO domarrtest values (NULL, '{{"a","b","c"},{"d","e","f"}}');
 INSERT INTO domarrtest values (NULL, '{{"toolong","b","c"},{"d","e","f"}}');
+INSERT INTO domarrtest (testint4arr[1], testint4arr[3]) values (11,22);
 select * from domarrtest;
 select testint4arr[1], testchar4arr[2:2] from domarrtest;
 select array_dims(testint4arr), array_dims(testchar4arr) from domarrtest;
@@ -100,6 +101,13 @@ COPY domarrtest FROM stdin;        -- fail
 
 select * from domarrtest;
 
+update domarrtest set
+  testint4arr[1] = testint4arr[1] + 1,
+  testint4arr[3] = testint4arr[3] - 1
+where testchar4arr is null;
+
+select * from domarrtest where testchar4arr is null;
+
 drop table domarrtest;
 drop domain domainint4arr restrict;
 drop domain domainchar4arr restrict;
@@ -111,6 +119,46 @@ select pg_typeof('{1,2,3}'::dia);
 select pg_typeof('{1,2,3}'::dia || 42); -- should be int[] not dia
 drop domain dia;
 
+
+-- Test domains over arrays of composite
+
+create type comptype as (r float8, i float8);
+create domain dcomptypea as comptype[];
+create table dcomptable (d1 dcomptypea unique);
+
+insert into dcomptable values (array[row(1,2)]::dcomptypea);
+insert into dcomptable values (array[row(3,4), row(5,6)]::comptype[]);
+insert into dcomptable values (array[row(7,8)::comptype, row(9,10)::comptype]);
+insert into dcomptable values (array[row(1,2)]::dcomptypea);  -- fail on uniqueness
+insert into dcomptable (d1[1]) values(row(9,10));
+insert into dcomptable (d1[1].r) values(11);
+
+select * from dcomptable;
+select d1[2], d1[1].r, d1[1].i from dcomptable;
+update dcomptable set d1[2] = row(d1[2].i, d1[2].r);
+select * from dcomptable;
+update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0;
+select * from dcomptable;
+
+alter domain dcomptypea add constraint c1 check (value[1].r <= value[1].i);
+alter domain dcomptypea add constraint c2 check (value[1].r > value[1].i);  -- fail
+
+select array[row(2,1)]::dcomptypea;  -- fail
+insert into dcomptable values (array[row(1,2)]::comptype[]);
+insert into dcomptable values (array[row(2,1)]::comptype[]);  -- fail
+insert into dcomptable (d1[1].r) values(99);
+insert into dcomptable (d1[1].r, d1[1].i) values(99, 100);
+insert into dcomptable (d1[1].r, d1[1].i) values(100, 99);  -- fail
+update dcomptable set d1[1].r = d1[1].r + 1 where d1[1].i > 0;  -- fail
+update dcomptable set d1[1].r = d1[1].r - 1 where d1[1].i > 0;
+select * from dcomptable;
+
+drop table dcomptable;
+drop type comptype cascade;
+
+
+-- Test not-null restrictions
+
 create domain dnotnull varchar(15) NOT NULL;
 create domain dnull    varchar(15);
 create domain dcheck   varchar(15) NOT NULL CHECK (VALUE = 'a' OR VALUE = 'c' OR VALUE = 'd');