]> granicus.if.org Git - postgresql/commitdiff
Rearrange "add column" logic to merge columns at exec time.
authorRobert Haas <rhaas@postgresql.org>
Mon, 4 Apr 2011 01:52:47 +0000 (21:52 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 4 Apr 2011 01:53:32 +0000 (21:53 -0400)
The previous coding set attinhcount too high in some cases, resulting in
an undumpable, undroppable column.  Per bug #5856, reported by Naoya
Anzai.  See also commit 31b6fc06d83c6de3644c8f2921eb7de0eb92fac3, which
fixes a similar bug in ALTER TABLE .. ADD CONSTRAINT.

Patch by Noah Misch.

src/backend/commands/tablecmds.c
src/include/nodes/parsenodes.h
src/test/regress/expected/alter_table.out
src/test/regress/sql/alter_table.sql

index 737ab1a7bc8c3335a79b5343cc48430a2e4c2427..4a97819b01f5210c2fd02cd7973db272bdabb51a 100644 (file)
@@ -285,16 +285,15 @@ static void ATSimplePermissions(Relation rel, int allowed_targets);
 static void ATWrongRelkindError(Relation rel, int allowed_targets);
 static void ATSimpleRecursion(List **wqueue, Relation rel,
                                  AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode);
-static void ATOneLevelRecursion(List **wqueue, Relation rel,
-                                       AlterTableCmd *cmd, LOCKMODE lockmode);
 static void ATTypedTableRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd,
                                                                  LOCKMODE lockmode);
 static List *find_typed_table_dependencies(Oid typeOid, const char *typeName,
                                                                                   DropBehavior behavior);
 static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
                                AlterTableCmd *cmd, LOCKMODE lockmode);
-static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
-                               ColumnDef *colDef, bool isOid, LOCKMODE lockmode);
+static void ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
+                               ColumnDef *colDef, bool isOid,
+                               bool recurse, bool recursing, LOCKMODE lockmode);
 static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid, Oid collid);
 static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse,
                          AlterTableCmd *cmd, LOCKMODE lockmode);
@@ -2775,15 +2774,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
                case AT_AddColumn:              /* ADD COLUMN */
                        ATSimplePermissions(rel,
                                ATT_TABLE|ATT_COMPOSITE_TYPE|ATT_FOREIGN_TABLE);
-                       /* Performs own recursion */
                        ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
+                       /* Recursion occurs during execution phase */
                        pass = AT_PASS_ADD_COL;
                        break;
                case AT_AddColumnToView:                /* add column via CREATE OR REPLACE
                                                                                 * VIEW */
                        ATSimplePermissions(rel, ATT_VIEW);
-                       /* Performs own recursion */
                        ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
+                       /* Recursion occurs during execution phase */
                        pass = AT_PASS_ADD_COL;
                        break;
                case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
@@ -2885,9 +2884,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
                        break;
                case AT_AddOids:                /* SET WITH OIDS */
                        ATSimplePermissions(rel, ATT_TABLE);
-                       /* Performs own recursion */
                        if (!rel->rd_rel->relhasoids || recursing)
                                ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
+                       /* Recursion occurs during execution phase */
                        pass = AT_PASS_ADD_COL;
                        break;
                case AT_DropOids:               /* SET WITHOUT OIDS */
@@ -3043,7 +3042,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
                case AT_AddColumn:              /* ADD COLUMN */
                case AT_AddColumnToView:                /* add column via CREATE OR REPLACE
                                                                                 * VIEW */
-                       ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false, lockmode);
+                       ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
+                                                       false, false, false, lockmode);
+                       break;
+               case AT_AddColumnRecurse:
+                       ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
+                                                       false, true, false, lockmode);
                        break;
                case AT_ColumnDefault:  /* ALTER COLUMN DEFAULT */
                        ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode);
@@ -3121,7 +3125,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
                case AT_AddOids:                /* SET WITH OIDS */
                        /* Use the ADD COLUMN code, unless prep decided to do nothing */
                        if (cmd->def != NULL)
-                               ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true, lockmode);
+                               ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
+                                                               true, false, false, lockmode);
+                       break;
+               case AT_AddOidsRecurse: /* SET WITH OIDS */
+                       /* Use the ADD COLUMN code, unless prep decided to do nothing */
+                       if (cmd->def != NULL)
+                               ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
+                                                               true, true, false, lockmode);
                        break;
                case AT_DropOids:               /* SET WITHOUT OIDS */
 
@@ -3842,37 +3853,6 @@ ATSimpleRecursion(List **wqueue, Relation rel,
        }
 }
 
-/*
- * ATOneLevelRecursion
- *
- * Here, we visit only direct inheritance children.  It is expected that
- * the command's prep routine will recurse again to find indirect children.
- * When using this technique, a multiply-inheriting child will be visited
- * multiple times.
- */
-static void
-ATOneLevelRecursion(List **wqueue, Relation rel,
-                                       AlterTableCmd *cmd, LOCKMODE lockmode)
-{
-       Oid                     relid = RelationGetRelid(rel);
-       ListCell   *child;
-       List       *children;
-
-       children = find_inheritance_children(relid, lockmode);
-
-       foreach(child, children)
-       {
-               Oid                     childrelid = lfirst_oid(child);
-               Relation        childrel;
-
-               /* find_inheritance_children already got lock */
-               childrel = relation_open(childrelid, NoLock);
-               CheckTableNotInUse(childrel, "ALTER TABLE");
-               ATPrepCmd(wqueue, childrel, cmd, true, true, lockmode);
-               relation_close(childrel, NoLock);
-       }
-}
-
 /*
  * ATTypedTableRecursion
  *
@@ -4060,6 +4040,12 @@ find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior be
  * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
  * AT_AddColumn AlterTableCmd by parse_utilcmd.c and added as independent
  * AlterTableCmd's.
+ *
+ * ADD COLUMN cannot use the normal ALTER TABLE recursion mechanism, because we
+ * have to decide at runtime whether to recurse or not depending on whether we
+ * actually add a column or merely merge with an existing column.  (We can't
+ * check this in a static pre-pass because it won't handle multiple inheritance
+ * situations correctly.)
  */
 static void
 ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
@@ -4070,43 +4056,17 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("cannot add column to typed table")));
 
-       /*
-        * Recurse to add the column to child classes, if requested.
-        *
-        * We must recurse one level at a time, so that multiply-inheriting
-        * children are visited the right number of times and end up with the
-        * right attinhcount.
-        */
-       if (recurse)
-       {
-               AlterTableCmd *childCmd = copyObject(cmd);
-               ColumnDef  *colDefChild = (ColumnDef *) childCmd->def;
-
-               /* Child should see column as singly inherited */
-               colDefChild->inhcount = 1;
-               colDefChild->is_local = false;
-
-               ATOneLevelRecursion(wqueue, rel, childCmd, lockmode);
-       }
-       else
-       {
-               /*
-                * If we are told not to recurse, there had better not be any child
-                * tables; else the addition would put them out of step.
-                */
-               if (find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                                        errmsg("column must be added to child tables too")));
-       }
-
        if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
                ATTypedTableRecursion(wqueue, rel, cmd, lockmode);
+
+       if (recurse)
+               cmd->subtype = AT_AddColumnRecurse;
 }
 
 static void
-ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
-                               ColumnDef *colDef, bool isOid, LOCKMODE lockmode)
+ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
+                               ColumnDef *colDef, bool isOid,
+                               bool recurse, bool recursing, LOCKMODE lockmode)
 {
        Oid                     myrelid = RelationGetRelid(rel);
        Relation        pgclass,
@@ -4121,12 +4081,20 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
        Oid                     collOid;
        Form_pg_type tform;
        Expr       *defval;
+       List       *children;
+       ListCell   *child;
+
+       /* At top level, permission check was done in ATPrepCmd, else do it */
+       if (recursing)
+               ATSimplePermissions(rel, ATT_TABLE);
 
        attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
 
        /*
         * Are we adding the column to a recursion child?  If so, check whether to
-        * merge with an existing definition for the column.
+        * merge with an existing definition for the column.  If we do merge,
+        * we must not recurse.  Children will already have the column, and
+     * recursing into them would mess up attinhcount.
         */
        if (colDef->inhcount > 0)
        {
@@ -4389,6 +4357,50 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
         * Add needed dependency entries for the new column.
         */
        add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid, attribute.attcollation);
+
+       /*
+        * Propagate to children as appropriate.  Unlike most other ALTER
+        * routines, we have to do this one level of recursion at a time; we can't
+        * use find_all_inheritors to do it in one pass.
+        */
+       children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+
+       /*
+        * If we are told not to recurse, there had better not be any child
+        * tables; else the addition would put them out of step.
+        */
+       if (children && !recurse)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                                errmsg("column must be added to child tables too")));
+
+       /* Children should see column as singly inherited */
+       if (!recursing)
+       {
+               colDef = copyObject(colDef);
+               colDef->inhcount = 1;
+               colDef->is_local = false;
+       }
+
+       foreach(child, children)
+       {
+               Oid                     childrelid = lfirst_oid(child);
+               Relation        childrel;
+               AlteredTableInfo *childtab;
+
+               /* find_inheritance_children already got lock */
+               childrel = heap_open(childrelid, NoLock);
+               CheckTableNotInUse(childrel, "ALTER TABLE");
+
+               /* Find or create work queue entry for this table */
+               childtab = ATGetQueueEntry(wqueue, childrel);
+
+               /* Recurse to child */
+               ATExecAddColumn(wqueue, childtab, childrel,
+                                               colDef, isOid, recurse, true, lockmode);
+
+               heap_close(childrel, NoLock);
+       }
 }
 
 /*
@@ -4440,6 +4452,9 @@ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOC
                cmd->def = (Node *) cdef;
        }
        ATPrepAddColumn(wqueue, rel, recurse, false, cmd, lockmode);
+
+       if (recurse)
+               cmd->subtype = AT_AddOidsRecurse;
 }
 
 /*
index 670b12fc82dc5f857a4a83c6f1994d65b789729c..d9eac766f02fbac4f0e3163affd0e39f2cf184fa 100644 (file)
@@ -1173,6 +1173,7 @@ typedef struct AlterTableStmt
 typedef enum AlterTableType
 {
        AT_AddColumn,                           /* add column */
+       AT_AddColumnRecurse,            /* internal to commands/tablecmds.c */
        AT_AddColumnToView,                     /* implicitly via CREATE OR REPLACE VIEW */
        AT_ColumnDefault,                       /* alter column default */
        AT_DropNotNull,                         /* alter column drop not null */
@@ -1198,6 +1199,7 @@ typedef enum AlterTableType
        AT_ClusterOn,                           /* CLUSTER ON */
        AT_DropCluster,                         /* SET WITHOUT CLUSTER */
        AT_AddOids,                                     /* SET WITH OIDS */
+       AT_AddOidsRecurse,                      /* internal to commands/tablecmds.c */
        AT_DropOids,                            /* SET WITHOUT OIDS */
        AT_SetTableSpace,                       /* SET TABLESPACE */
        AT_SetRelOptions,                       /* SET (...) -- AM specific parameters */
index 578f94bc64ce87c8d06eb9406a7d86844a66605e..d7d1b646cfa37f747926260f02d9d281861eb3a1 100644 (file)
@@ -1198,6 +1198,23 @@ drop table p1, p2 cascade;
 NOTICE:  drop cascades to 2 other objects
 DETAIL:  drop cascades to table c1
 drop cascades to table gc1
+-- test attinhcount tracking with merged columns
+create table depth0();
+create table depth1(c text) inherits (depth0);
+create table depth2() inherits (depth1);
+alter table depth0 add c text;
+NOTICE:  merging definition of column "c" for child "depth1"
+select attrelid::regclass, attname, attinhcount, attislocal
+from pg_attribute
+where attnum > 0 and attrelid::regclass in ('depth0', 'depth1', 'depth2')
+order by attrelid::regclass::text, attnum;
+ attrelid | attname | attinhcount | attislocal 
+----------+---------+-------------+------------
+ depth0   | c       |           0 | t
+ depth1   | c       |           1 | t
+ depth2   | c       |           1 | f
+(3 rows)
+
 --
 -- Test the ALTER TABLE SET WITH/WITHOUT OIDS command
 --
index 54dbb8eaf9deb0d5072af22c5844b4be8bf05302..749584d9e636c9a7bd89936f40c4f227537499ff 100644 (file)
@@ -944,6 +944,18 @@ order by relname, attnum;
 
 drop table p1, p2 cascade;
 
+-- test attinhcount tracking with merged columns
+
+create table depth0();
+create table depth1(c text) inherits (depth0);
+create table depth2() inherits (depth1);
+alter table depth0 add c text;
+
+select attrelid::regclass, attname, attinhcount, attislocal
+from pg_attribute
+where attnum > 0 and attrelid::regclass in ('depth0', 'depth1', 'depth2')
+order by attrelid::regclass::text, attnum;
+
 --
 -- Test the ALTER TABLE SET WITH/WITHOUT OIDS command
 --