]> granicus.if.org Git - postgresql/commitdiff
Fix ALTER TABLE / SET TYPE for irregular inheritance
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 9 Jan 2017 22:26:58 +0000 (19:26 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 9 Jan 2017 22:26:58 +0000 (19:26 -0300)
If inherited tables don't have exactly the same schema, the USING clause
in an ALTER TABLE / SET DATA TYPE misbehaves when applied to the
children tables since commit 9550e8348b79.  Starting with that commit,
the attribute numbers in the USING expression are fixed during parse
analysis.  This can lead to bogus errors being reported during
execution, such as:
   ERROR:  attribute 2 has wrong type
   DETAIL:  Table has type smallint, but query expects integer.

Since it wouldn't do to revert to the original coding, we now apply a
transformation to map the attribute numbers to the correct ones for each
child.

Reported by Justin Pryzby
Analysis by Tom Lane; patch by me.
Discussion: https://postgr.es/m/20170102225618.GA10071@telsasoft.com

src/backend/access/common/tupconvert.c
src/backend/commands/tablecmds.c
src/include/access/tupconvert.h
src/test/regress/expected/alter_table.out
src/test/regress/sql/alter_table.sql

index 4787d4ca98f945464881622ddca822fb96ba2ad3..b2546f9bbcbddb9095e084bb0d6788e8f667e31e 100644 (file)
@@ -206,55 +206,12 @@ convert_tuples_by_name(TupleDesc indesc,
 {
        TupleConversionMap *map;
        AttrNumber *attrMap;
-       int                     n;
+       int                     n = outdesc->natts;
        int                     i;
        bool            same;
 
        /* Verify compatibility and prepare attribute-number map */
-       n = outdesc->natts;
-       attrMap = (AttrNumber *) palloc0(n * sizeof(AttrNumber));
-       for (i = 0; i < n; i++)
-       {
-               Form_pg_attribute att = outdesc->attrs[i];
-               char       *attname;
-               Oid                     atttypid;
-               int32           atttypmod;
-               int                     j;
-
-               if (att->attisdropped)
-                       continue;                       /* attrMap[i] is already 0 */
-               attname = NameStr(att->attname);
-               atttypid = att->atttypid;
-               atttypmod = att->atttypmod;
-               for (j = 0; j < indesc->natts; j++)
-               {
-                       att = indesc->attrs[j];
-                       if (att->attisdropped)
-                               continue;
-                       if (strcmp(attname, NameStr(att->attname)) == 0)
-                       {
-                               /* Found it, check type */
-                               if (atttypid != att->atttypid || atttypmod != att->atttypmod)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                                                        errmsg_internal("%s", _(msg)),
-                                                        errdetail("Attribute \"%s\" of type %s does not match corresponding attribute of type %s.",
-                                                                          attname,
-                                                                          format_type_be(outdesc->tdtypeid),
-                                                                          format_type_be(indesc->tdtypeid))));
-                               attrMap[i] = (AttrNumber) (j + 1);
-                               break;
-                       }
-               }
-               if (attrMap[i] == 0)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                                        errmsg_internal("%s", _(msg)),
-                                        errdetail("Attribute \"%s\" of type %s does not exist in type %s.",
-                                                          attname,
-                                                          format_type_be(outdesc->tdtypeid),
-                                                          format_type_be(indesc->tdtypeid))));
-       }
+       attrMap = convert_tuples_by_name_map(indesc, outdesc, msg);
 
        /*
         * Check to see if the map is one-to-one and the tuple types are the same.
@@ -312,6 +269,69 @@ convert_tuples_by_name(TupleDesc indesc,
        return map;
 }
 
+/*
+ * Return a palloc'd bare attribute map for tuple conversion, matching input
+ * and output columns by name.  (Dropped columns are ignored in both input and
+ * output.)  This is normally a subroutine for convert_tuples_by_name, but can
+ * be used standalone.
+ */
+AttrNumber *
+convert_tuples_by_name_map(TupleDesc indesc,
+                                                  TupleDesc outdesc,
+                                                  const char *msg)
+{
+       AttrNumber *attrMap;
+       int                     n;
+       int                     i;
+
+       n = outdesc->natts;
+       attrMap = (AttrNumber *) palloc0(n * sizeof(AttrNumber));
+       for (i = 0; i < n; i++)
+       {
+               Form_pg_attribute att = outdesc->attrs[i];
+               char       *attname;
+               Oid                     atttypid;
+               int32           atttypmod;
+               int                     j;
+
+               if (att->attisdropped)
+                       continue;                       /* attrMap[i] is already 0 */
+               attname = NameStr(att->attname);
+               atttypid = att->atttypid;
+               atttypmod = att->atttypmod;
+               for (j = 0; j < indesc->natts; j++)
+               {
+                       att = indesc->attrs[j];
+                       if (att->attisdropped)
+                               continue;
+                       if (strcmp(attname, NameStr(att->attname)) == 0)
+                       {
+                               /* Found it, check type */
+                               if (atttypid != att->atttypid || atttypmod != att->atttypmod)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                                        errmsg_internal("%s", _(msg)),
+                                                        errdetail("Attribute \"%s\" of type %s does not match corresponding attribute of type %s.",
+                                                                          attname,
+                                                                          format_type_be(outdesc->tdtypeid),
+                                                                          format_type_be(indesc->tdtypeid))));
+                               attrMap[i] = (AttrNumber) (j + 1);
+                               break;
+                       }
+               }
+               if (attrMap[i] == 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg_internal("%s", _(msg)),
+                                        errdetail("Attribute \"%s\" of type %s does not exist in type %s.",
+                                                          attname,
+                                                          format_type_be(outdesc->tdtypeid),
+                                                          format_type_be(indesc->tdtypeid))));
+       }
+
+       return attrMap;
+}
+
 /*
  * Perform conversion of a tuple according to the map.
  */
index 588677c7bc2adf4461f7ee85d299e5817006ad12..8e0483ef3d746556e7265c8b5c0570ee7e441ea0 100644 (file)
@@ -20,6 +20,7 @@
 #include "access/reloptions.h"
 #include "access/relscan.h"
 #include "access/sysattr.h"
+#include "access/tupconvert.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
@@ -8029,12 +8030,69 @@ ATPrepAlterColumnType(List **wqueue,
        ReleaseSysCache(tuple);
 
        /*
-        * The recursion case is handled by ATSimpleRecursion.  However, if we are
-        * told not to recurse, there had better not be any child tables; else the
-        * alter would put them out of step.
+        * Recurse manually by queueing a new command for each child, if
+        * necessary. We cannot apply ATSimpleRecursion here because we need to
+        * remap attribute numbers in the USING expression, if any.
+        *
+        * If we are told not to recurse, there had better not be any child
+        * tables; else the alter would put them out of step.
         */
        if (recurse)
-               ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+       {
+               Oid                     relid = RelationGetRelid(rel);
+               ListCell   *child;
+               List       *children;
+
+               children = find_all_inheritors(relid, lockmode, NULL);
+
+               /*
+                * find_all_inheritors does the recursive search of the inheritance
+                * hierarchy, so all we have to do is process all of the relids in the
+                * list that it returns.
+                */
+               foreach(child, children)
+               {
+                       Oid                     childrelid = lfirst_oid(child);
+                       Relation        childrel;
+
+                       if (childrelid == relid)
+                               continue;
+
+                       /* find_all_inheritors already got lock */
+                       childrel = relation_open(childrelid, NoLock);
+                       CheckTableNotInUse(childrel, "ALTER TABLE");
+
+                       /*
+                        * Remap the attribute numbers.  If no USING expression was
+                        * specified, there is no need for this step.
+                        */
+                       if (def->cooked_default)
+                       {
+                               AttrNumber *attmap;
+                               bool            found_whole_row;
+
+                               /* create a copy to scribble on */
+                               cmd = copyObject(cmd);
+
+                               attmap = convert_tuples_by_name_map(RelationGetDescr(childrel),
+                                                                                                       RelationGetDescr(rel),
+                                                                gettext_noop("could not convert row type"));
+                               ((ColumnDef *) cmd->def)->cooked_default =
+                                       map_variable_attnos(def->cooked_default,
+                                                                               1, 0,
+                                                                               attmap, RelationGetDescr(rel)->natts,
+                                                                               &found_whole_row);
+                               if (found_whole_row)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                 errmsg("cannot convert whole-row table reference"),
+                                                        errdetail("USING expression contains a whole-row table reference.")));
+                               pfree(attmap);
+                       }
+                       ATPrepCmd(wqueue, childrel, cmd, false, true, lockmode);
+                       relation_close(childrel, NoLock);
+               }
+       }
        else if (!recursing &&
                         find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
                ereport(ERROR,
index 10556eec7e186966142af9919627ef4ba1718245..a7e48eb3811b30537d3661938ba0746e0c246dd3 100644 (file)
@@ -38,6 +38,10 @@ extern TupleConversionMap *convert_tuples_by_name(TupleDesc indesc,
                                           TupleDesc outdesc,
                                           const char *msg);
 
+extern AttrNumber *convert_tuples_by_name_map(TupleDesc indesc,
+                                                  TupleDesc outdesc,
+                                                  const char *msg);
+
 extern HeapTuple do_convert_tuple(HeapTuple tuple, TupleConversionMap *map);
 
 extern void free_conversion_map(TupleConversionMap *map);
index a1ab823acfeb4f89b4d5e95edccf10588ccfd609..fadad222b030cd9a532e619e17de5c7ca245ebfe 100644 (file)
@@ -2048,6 +2048,28 @@ select relname, conname, coninhcount, conislocal, connoinherit
  test_inh_check_child | test_inh_check_a_check |           1 | f          | f
 (6 rows)
 
+-- ALTER COLUMN TYPE with different schema in children
+-- Bug at https://postgr.es/m/20170102225618.GA10071@telsasoft.com
+CREATE TABLE test_type_diff (f1 int);
+CREATE TABLE test_type_diff_c (extra smallint) INHERITS (test_type_diff);
+ALTER TABLE test_type_diff ADD COLUMN f2 int;
+INSERT INTO test_type_diff_c VALUES (1, 2, 3);
+ALTER TABLE test_type_diff ALTER COLUMN f2 TYPE bigint USING f2::bigint;
+CREATE TABLE test_type_diff2 (int_two int2, int_four int4, int_eight int8);
+CREATE TABLE test_type_diff2_c1 (int_four int4, int_eight int8, int_two int2);
+CREATE TABLE test_type_diff2_c2 (int_eight int8, int_two int2, int_four int4);
+CREATE TABLE test_type_diff2_c3 (int_two int2, int_four int4, int_eight int8);
+ALTER TABLE test_type_diff2_c1 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c2 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c3 INHERIT test_type_diff2;
+INSERT INTO test_type_diff2_c1 VALUES (1, 2, 3);
+INSERT INTO test_type_diff2_c2 VALUES (4, 5, 6);
+INSERT INTO test_type_diff2_c3 VALUES (7, 8, 9);
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int8 USING int_four::int8;
+-- whole-row references are disallowed
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int4 USING (pg_column_size(test_type_diff2));
+ERROR:  cannot convert whole-row table reference
+DETAIL:  USING expression contains a whole-row table reference.
 -- check for rollback of ANALYZE corrupting table property flags (bug #11638)
 CREATE TABLE check_fk_presence_1 (id int PRIMARY KEY, t text);
 CREATE TABLE check_fk_presence_2 (id int REFERENCES check_fk_presence_1, t text);
index c8eed3ec646c2bfc716be3a881a34296f96f79dc..d2f06a360a5976e46c183e2d9e68cdb2b8185c8d 100644 (file)
@@ -1335,6 +1335,28 @@ select relname, conname, coninhcount, conislocal, connoinherit
   where relname like 'test_inh_check%' and c.conrelid = r.oid
   order by 1, 2;
 
+-- ALTER COLUMN TYPE with different schema in children
+-- Bug at https://postgr.es/m/20170102225618.GA10071@telsasoft.com
+CREATE TABLE test_type_diff (f1 int);
+CREATE TABLE test_type_diff_c (extra smallint) INHERITS (test_type_diff);
+ALTER TABLE test_type_diff ADD COLUMN f2 int;
+INSERT INTO test_type_diff_c VALUES (1, 2, 3);
+ALTER TABLE test_type_diff ALTER COLUMN f2 TYPE bigint USING f2::bigint;
+
+CREATE TABLE test_type_diff2 (int_two int2, int_four int4, int_eight int8);
+CREATE TABLE test_type_diff2_c1 (int_four int4, int_eight int8, int_two int2);
+CREATE TABLE test_type_diff2_c2 (int_eight int8, int_two int2, int_four int4);
+CREATE TABLE test_type_diff2_c3 (int_two int2, int_four int4, int_eight int8);
+ALTER TABLE test_type_diff2_c1 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c2 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c3 INHERIT test_type_diff2;
+INSERT INTO test_type_diff2_c1 VALUES (1, 2, 3);
+INSERT INTO test_type_diff2_c2 VALUES (4, 5, 6);
+INSERT INTO test_type_diff2_c3 VALUES (7, 8, 9);
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int8 USING int_four::int8;
+-- whole-row references are disallowed
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int4 USING (pg_column_size(test_type_diff2));
+
 -- check for rollback of ANALYZE corrupting table property flags (bug #11638)
 CREATE TABLE check_fk_presence_1 (id int PRIMARY KEY, t text);
 CREATE TABLE check_fk_presence_2 (id int REFERENCES check_fk_presence_1, t text);