From 6c5723998594dffa5d47c3cf8c96ccf89c033aae Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Sun, 3 Apr 2011 21:52:47 -0400 Subject: [PATCH] Rearrange "add column" logic to merge columns at exec time. The previous coding set attinhcount too high in some cases, resulting in an undumpable, undroppable column. Per bug #5856, reported by Naoya Anzai. See also commit 31b6fc06d83c6de3644c8f2921eb7de0eb92fac3, which fixes a similar bug in ALTER TABLE .. ADD CONSTRAINT. Patch by Noah Misch. --- src/backend/commands/tablecmds.c | 161 ++++++++++++---------- src/include/nodes/parsenodes.h | 2 + src/test/regress/expected/alter_table.out | 17 +++ src/test/regress/sql/alter_table.sql | 12 ++ 4 files changed, 119 insertions(+), 73 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 737ab1a7bc..4a97819b01 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -285,16 +285,15 @@ static void ATSimplePermissions(Relation rel, int allowed_targets); static void ATWrongRelkindError(Relation rel, int allowed_targets); static void ATSimpleRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode); -static void ATOneLevelRecursion(List **wqueue, Relation rel, - AlterTableCmd *cmd, LOCKMODE lockmode); static void ATTypedTableRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd, LOCKMODE lockmode); static List *find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior behavior); static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, AlterTableCmd *cmd, LOCKMODE lockmode); -static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, - ColumnDef *colDef, bool isOid, LOCKMODE lockmode); +static void ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, + ColumnDef *colDef, bool isOid, + bool recurse, bool recursing, LOCKMODE lockmode); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid, Oid collid); static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOCKMODE lockmode); @@ -2775,15 +2774,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, case AT_AddColumn: /* ADD COLUMN */ ATSimplePermissions(rel, ATT_TABLE|ATT_COMPOSITE_TYPE|ATT_FOREIGN_TABLE); - /* Performs own recursion */ ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode); + /* Recursion occurs during execution phase */ pass = AT_PASS_ADD_COL; break; case AT_AddColumnToView: /* add column via CREATE OR REPLACE * VIEW */ ATSimplePermissions(rel, ATT_VIEW); - /* Performs own recursion */ ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode); + /* Recursion occurs during execution phase */ pass = AT_PASS_ADD_COL; break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ @@ -2885,9 +2884,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_AddOids: /* SET WITH OIDS */ ATSimplePermissions(rel, ATT_TABLE); - /* Performs own recursion */ if (!rel->rd_rel->relhasoids || recursing) ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode); + /* Recursion occurs during execution phase */ pass = AT_PASS_ADD_COL; break; case AT_DropOids: /* SET WITHOUT OIDS */ @@ -3043,7 +3042,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_AddColumn: /* ADD COLUMN */ case AT_AddColumnToView: /* add column via CREATE OR REPLACE * VIEW */ - ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false, lockmode); + ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def, + false, false, false, lockmode); + break; + case AT_AddColumnRecurse: + ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def, + false, true, false, lockmode); break; case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode); @@ -3121,7 +3125,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_AddOids: /* SET WITH OIDS */ /* Use the ADD COLUMN code, unless prep decided to do nothing */ if (cmd->def != NULL) - ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true, lockmode); + ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def, + true, false, false, lockmode); + break; + case AT_AddOidsRecurse: /* SET WITH OIDS */ + /* Use the ADD COLUMN code, unless prep decided to do nothing */ + if (cmd->def != NULL) + ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def, + true, true, false, lockmode); break; case AT_DropOids: /* SET WITHOUT OIDS */ @@ -3842,37 +3853,6 @@ ATSimpleRecursion(List **wqueue, Relation rel, } } -/* - * ATOneLevelRecursion - * - * Here, we visit only direct inheritance children. It is expected that - * the command's prep routine will recurse again to find indirect children. - * When using this technique, a multiply-inheriting child will be visited - * multiple times. - */ -static void -ATOneLevelRecursion(List **wqueue, Relation rel, - AlterTableCmd *cmd, LOCKMODE lockmode) -{ - Oid relid = RelationGetRelid(rel); - ListCell *child; - List *children; - - children = find_inheritance_children(relid, lockmode); - - foreach(child, children) - { - Oid childrelid = lfirst_oid(child); - Relation childrel; - - /* find_inheritance_children already got lock */ - childrel = relation_open(childrelid, NoLock); - CheckTableNotInUse(childrel, "ALTER TABLE"); - ATPrepCmd(wqueue, childrel, cmd, true, true, lockmode); - relation_close(childrel, NoLock); - } -} - /* * ATTypedTableRecursion * @@ -4060,6 +4040,12 @@ find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior be * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the * AT_AddColumn AlterTableCmd by parse_utilcmd.c and added as independent * AlterTableCmd's. + * + * ADD COLUMN cannot use the normal ALTER TABLE recursion mechanism, because we + * have to decide at runtime whether to recurse or not depending on whether we + * actually add a column or merely merge with an existing column. (We can't + * check this in a static pre-pass because it won't handle multiple inheritance + * situations correctly.) */ static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, @@ -4070,43 +4056,17 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot add column to typed table"))); - /* - * Recurse to add the column to child classes, if requested. - * - * We must recurse one level at a time, so that multiply-inheriting - * children are visited the right number of times and end up with the - * right attinhcount. - */ - if (recurse) - { - AlterTableCmd *childCmd = copyObject(cmd); - ColumnDef *colDefChild = (ColumnDef *) childCmd->def; - - /* Child should see column as singly inherited */ - colDefChild->inhcount = 1; - colDefChild->is_local = false; - - ATOneLevelRecursion(wqueue, rel, childCmd, lockmode); - } - else - { - /* - * If we are told not to recurse, there had better not be any child - * tables; else the addition would put them out of step. - */ - if (find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column must be added to child tables too"))); - } - if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) ATTypedTableRecursion(wqueue, rel, cmd, lockmode); + + if (recurse) + cmd->subtype = AT_AddColumnRecurse; } static void -ATExecAddColumn(AlteredTableInfo *tab, Relation rel, - ColumnDef *colDef, bool isOid, LOCKMODE lockmode) +ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, + ColumnDef *colDef, bool isOid, + bool recurse, bool recursing, LOCKMODE lockmode) { Oid myrelid = RelationGetRelid(rel); Relation pgclass, @@ -4121,12 +4081,20 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, Oid collOid; Form_pg_type tform; Expr *defval; + List *children; + ListCell *child; + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, ATT_TABLE); attrdesc = heap_open(AttributeRelationId, RowExclusiveLock); /* * Are we adding the column to a recursion child? If so, check whether to - * merge with an existing definition for the column. + * merge with an existing definition for the column. If we do merge, + * we must not recurse. Children will already have the column, and + * recursing into them would mess up attinhcount. */ if (colDef->inhcount > 0) { @@ -4389,6 +4357,50 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, * Add needed dependency entries for the new column. */ add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid, attribute.attcollation); + + /* + * Propagate to children as appropriate. Unlike most other ALTER + * routines, we have to do this one level of recursion at a time; we can't + * use find_all_inheritors to do it in one pass. + */ + children = find_inheritance_children(RelationGetRelid(rel), lockmode); + + /* + * If we are told not to recurse, there had better not be any child + * tables; else the addition would put them out of step. + */ + if (children && !recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column must be added to child tables too"))); + + /* Children should see column as singly inherited */ + if (!recursing) + { + colDef = copyObject(colDef); + colDef->inhcount = 1; + colDef->is_local = false; + } + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + AlteredTableInfo *childtab; + + /* find_inheritance_children already got lock */ + childrel = heap_open(childrelid, NoLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); + + /* Find or create work queue entry for this table */ + childtab = ATGetQueueEntry(wqueue, childrel); + + /* Recurse to child */ + ATExecAddColumn(wqueue, childtab, childrel, + colDef, isOid, recurse, true, lockmode); + + heap_close(childrel, NoLock); + } } /* @@ -4440,6 +4452,9 @@ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOC cmd->def = (Node *) cdef; } ATPrepAddColumn(wqueue, rel, recurse, false, cmd, lockmode); + + if (recurse) + cmd->subtype = AT_AddOidsRecurse; } /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 670b12fc82..d9eac766f0 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1173,6 +1173,7 @@ typedef struct AlterTableStmt typedef enum AlterTableType { AT_AddColumn, /* add column */ + AT_AddColumnRecurse, /* internal to commands/tablecmds.c */ AT_AddColumnToView, /* implicitly via CREATE OR REPLACE VIEW */ AT_ColumnDefault, /* alter column default */ AT_DropNotNull, /* alter column drop not null */ @@ -1198,6 +1199,7 @@ typedef enum AlterTableType AT_ClusterOn, /* CLUSTER ON */ AT_DropCluster, /* SET WITHOUT CLUSTER */ AT_AddOids, /* SET WITH OIDS */ + AT_AddOidsRecurse, /* internal to commands/tablecmds.c */ AT_DropOids, /* SET WITHOUT OIDS */ AT_SetTableSpace, /* SET TABLESPACE */ AT_SetRelOptions, /* SET (...) -- AM specific parameters */ diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out index 578f94bc64..d7d1b646cf 100644 --- a/src/test/regress/expected/alter_table.out +++ b/src/test/regress/expected/alter_table.out @@ -1198,6 +1198,23 @@ drop table p1, p2 cascade; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table c1 drop cascades to table gc1 +-- test attinhcount tracking with merged columns +create table depth0(); +create table depth1(c text) inherits (depth0); +create table depth2() inherits (depth1); +alter table depth0 add c text; +NOTICE: merging definition of column "c" for child "depth1" +select attrelid::regclass, attname, attinhcount, attislocal +from pg_attribute +where attnum > 0 and attrelid::regclass in ('depth0', 'depth1', 'depth2') +order by attrelid::regclass::text, attnum; + attrelid | attname | attinhcount | attislocal +----------+---------+-------------+------------ + depth0 | c | 0 | t + depth1 | c | 1 | t + depth2 | c | 1 | f +(3 rows) + -- -- Test the ALTER TABLE SET WITH/WITHOUT OIDS command -- diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql index 54dbb8eaf9..749584d9e6 100644 --- a/src/test/regress/sql/alter_table.sql +++ b/src/test/regress/sql/alter_table.sql @@ -944,6 +944,18 @@ order by relname, attnum; drop table p1, p2 cascade; +-- test attinhcount tracking with merged columns + +create table depth0(); +create table depth1(c text) inherits (depth0); +create table depth2() inherits (depth1); +alter table depth0 add c text; + +select attrelid::regclass, attname, attinhcount, attislocal +from pg_attribute +where attnum > 0 and attrelid::regclass in ('depth0', 'depth1', 'depth2') +order by attrelid::regclass::text, attnum; + -- -- Test the ALTER TABLE SET WITH/WITHOUT OIDS command -- -- 2.40.0