From afc73284575d102048d850eb0b5fd66e036d3b54 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sat, 30 Jun 2012 16:44:09 -0400 Subject: [PATCH] Prevent CREATE TABLE LIKE/INHERITS from (mis) copying whole-row Vars. If a CHECK constraint or index definition contained a whole-row Var (that is, "table.*"), an attempt to copy that definition via CREATE TABLE LIKE or table inheritance produced incorrect results: the copied Var still claimed to have the rowtype of the source table, rather than the created table. For the LIKE case, it seems reasonable to just throw error for this situation, since the point of LIKE is that the new table is not permanently coupled to the old, so there's no reason to assume its rowtype will stay compatible. In the inheritance case, we should ideally allow such constraints, but doing so will require nontrivial refactoring of CREATE TABLE processing (because we'd need to know the OID of the new table's rowtype before we adjust inherited CHECK constraints). In view of the lack of previous complaints, that doesn't seem worth the risk in a back-patched bug fix, so just make it throw error for the inheritance case as well. Along the way, replace change_varattnos_of_a_node() with a more robust function map_variable_attnos(), which is capable of being extended to handle insertion of ConvertRowtypeExpr whenever we get around to fixing the inheritance case nicely, and in the meantime it returns a failure indication to the caller so that a helpful message with some context can be thrown. Also, this code will do the right thing with subselects (if we ever allow them in CHECK or indexes), and it range-checks varattnos before using them to index into the map array. Per report from Sergey Konoplev. Back-patch to all supported branches. --- src/backend/commands/tablecmds.c | 131 +++++------------------------ src/backend/parser/parse_utilcmd.c | 84 +++++++++++++++--- src/backend/rewrite/rewriteManip.c | 113 +++++++++++++++++++++++++ src/include/commands/tablecmds.h | 4 - src/include/rewrite/rewriteManip.h | 5 ++ 5 files changed, 210 insertions(+), 127 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 12c94e1993..91c2c6d51b 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -62,6 +62,7 @@ #include "parser/parser.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" +#include "rewrite/rewriteManip.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/smgr.h" @@ -225,7 +226,6 @@ static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr, int *supOidCount); static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); -static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); static void StoreCatalogInheritance(Oid relationId, List *supers); @@ -1353,7 +1353,7 @@ MergeAttributes(List *schema, List *supers, bool istemp, * parents after the first one, nor if we have dropped columns.) */ newattno = (AttrNumber *) - palloc(tupleDesc->natts * sizeof(AttrNumber)); + palloc0(tupleDesc->natts * sizeof(AttrNumber)); for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) @@ -1367,15 +1367,7 @@ MergeAttributes(List *schema, List *supers, bool istemp, * Ignore dropped columns in the parent. */ if (attribute->attisdropped) - { - /* - * change_varattnos_of_a_node asserts that this is greater - * than zero, so if anything tries to use it, we should find - * out. - */ - newattno[parent_attno - 1] = 0; - continue; - } + continue; /* leave newattno entry as zero */ /* * Does it conflict with some previously inherited column? @@ -1500,10 +1492,26 @@ MergeAttributes(List *schema, List *supers, bool istemp, { char *name = check[i].ccname; Node *expr; + bool found_whole_row; + + /* Adjust Vars to match new table's column numbering */ + expr = map_variable_attnos(stringToNode(check[i].ccbin), + 1, 0, + newattno, tupleDesc->natts, + &found_whole_row); - /* adjust varattnos of ccbin here */ - expr = stringToNode(check[i].ccbin); - change_varattnos_of_a_node(expr, newattno); + /* + * For the moment we have to reject whole-row variables. + * We could convert them, if we knew the new table's rowtype + * OID, but that hasn't been assigned yet. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", + name, + RelationGetRelationName(relation)))); /* check for duplicate */ if (!MergeCheckConstraint(constraints, name, expr)) @@ -1692,101 +1700,6 @@ MergeCheckConstraint(List *constraints, char *name, Node *expr) } -/* - * Replace varattno values in an expression tree according to the given - * map array, that is, varattno N is replaced by newattno[N-1]. It is - * caller's responsibility to ensure that the array is long enough to - * define values for all user varattnos present in the tree. System column - * attnos remain unchanged. - * - * Note that the passed node tree is modified in-place! - */ -void -change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) -{ - /* no setup needed, so away we go */ - (void) change_varattnos_walker(node, newattno); -} - -static bool -change_varattnos_walker(Node *node, const AttrNumber *newattno) -{ - if (node == NULL) - return false; - if (IsA(node, Var)) - { - Var *var = (Var *) node; - - if (var->varlevelsup == 0 && var->varno == 1 && - var->varattno > 0) - { - /* - * ??? the following may be a problem when the node is multiply - * referenced though stringToNode() doesn't create such a node - * currently. - */ - Assert(newattno[var->varattno - 1] > 0); - var->varattno = var->varoattno = newattno[var->varattno - 1]; - } - return false; - } - return expression_tree_walker(node, change_varattnos_walker, - (void *) newattno); -} - -/* - * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's, - * matching according to column name. - */ -AttrNumber * -varattnos_map(TupleDesc olddesc, TupleDesc newdesc) -{ - AttrNumber *attmap; - int i, - j; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * olddesc->natts); - for (i = 1; i <= olddesc->natts; i++) - { - if (olddesc->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - for (j = 1; j <= newdesc->natts; j++) - { - if (strcmp(NameStr(olddesc->attrs[i - 1]->attname), - NameStr(newdesc->attrs[j - 1]->attname)) == 0) - { - attmap[i - 1] = j; - break; - } - } - } - return attmap; -} - -/* - * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list - * of ColumnDefs - */ -AttrNumber * -varattnos_map_schema(TupleDesc old, List *schema) -{ - AttrNumber *attmap; - int i; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); - for (i = 1; i <= old->natts; i++) - { - if (old->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname), - schema); - } - return attmap; -} - - /* * StoreCatalogInheritance * Updates the system catalogs with proper inheritance information. diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index d1e1dac3a1..afd38c3ef9 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -109,7 +109,8 @@ static void transformOfType(ParseState *pstate, CreateStmtContext *cxt, TypeName *ofTypename); static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt); static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, - Relation parent_index, AttrNumber *attmap); + Relation source_idx, + const AttrNumber *attmap, int attmap_length); static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt); @@ -579,6 +580,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, Relation relation; TupleDesc tupleDesc; TupleConstr *constr; + AttrNumber *attmap; AclResult aclresult; char *comment; @@ -602,6 +604,13 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, tupleDesc = RelationGetDescr(relation); constr = tupleDesc->constr; + /* + * Initialize column number map for map_variable_attnos(). We need this + * since dropped columns in the source table aren't copied, so the new + * table can have different column numbers. + */ + attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts); + /* * Insert the copied attributes into the cxt for the new table definition. */ @@ -613,7 +622,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, ColumnDef *def; /* - * Ignore dropped columns in the parent. + * Ignore dropped columns in the parent. attmap entry is left zero. */ if (attribute->attisdropped) continue; @@ -640,6 +649,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, */ cxt->columns = lappend(cxt->columns, def); + attmap[parent_attno - 1] = list_length(cxt->columns); + /* * Copy default, if present and the default has been requested */ @@ -698,22 +709,39 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, /* * Copy CHECK constraints if requested, being careful to adjust attribute - * numbers + * numbers so they match the child. */ if ((inhRelation->options & CREATE_TABLE_LIKE_CONSTRAINTS) && tupleDesc->constr) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); int ccnum; for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++) { char *ccname = tupleDesc->constr->check[ccnum].ccname; char *ccbin = tupleDesc->constr->check[ccnum].ccbin; - Node *ccbin_node = stringToNode(ccbin); Constraint *n = makeNode(Constraint); + Node *ccbin_node; + bool found_whole_row; + + ccbin_node = map_variable_attnos(stringToNode(ccbin), + 1, 0, + attmap, tupleDesc->natts, + &found_whole_row); - change_varattnos_of_a_node(ccbin_node, attmap); + /* + * We reject whole-row variables because the whole point of LIKE + * is that the new table's rowtype might later diverge from the + * parent's. So, while translation might be possible right now, + * it wouldn't be possible to guarantee it would work in future. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", + ccname, + RelationGetRelationName(relation)))); n->contype = CONSTR_CHECK; n->location = -1; @@ -749,7 +777,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, if ((inhRelation->options & CREATE_TABLE_LIKE_INDEXES) && relation->rd_rel->relhasindex) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); List *parent_indexes; ListCell *l; @@ -764,7 +791,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap); + index_stmt = generateClonedIndexStmt(cxt, parent_index, + attmap, tupleDesc->natts); /* Copy comment on index */ if (inhRelation->options & CREATE_TABLE_LIKE_COMMENTS) @@ -879,7 +907,7 @@ chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt) */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, - AttrNumber *attmap) + const AttrNumber *attmap, int attmap_length) { Oid source_relid = RelationGetRelid(source_idx); Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs; @@ -1059,14 +1087,26 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, { /* Expressional index */ Node *indexkey; + bool found_whole_row; if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); indexpr_item = lnext(indexpr_item); - /* OK to modify indexkey since we are working on a private copy */ - change_varattnos_of_a_node(indexkey, attmap); + /* Adjust Vars to match new table's column numbering */ + indexkey = map_variable_attnos(indexkey, + 1, 0, + attmap, attmap_length, + &found_whole_row); + + /* As in transformTableLikeClause, reject whole-row variables */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Index \"%s\" contains a whole-row table reference.", + RelationGetRelationName(source_idx)))); iparam->name = NULL; iparam->expr = indexkey; @@ -1120,12 +1160,28 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, if (!isnull) { char *pred_str; + Node *pred_tree; + bool found_whole_row; /* Convert text string to node tree */ pred_str = TextDatumGetCString(datum); - index->whereClause = (Node *) stringToNode(pred_str); - /* Adjust attribute numbers */ - change_varattnos_of_a_node(index->whereClause, attmap); + pred_tree = (Node *) stringToNode(pred_str); + + /* Adjust Vars to match new table's column numbering */ + pred_tree = map_variable_attnos(pred_tree, + 1, 0, + attmap, attmap_length, + &found_whole_row); + + /* As in transformTableLikeClause, reject whole-row variables */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Index \"%s\" contains a whole-row table reference.", + RelationGetRelationName(source_idx)))); + + index->whereClause = pred_tree; } /* Clean up */ diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c index 724e94b913..ce0f8c6bf6 100644 --- a/src/backend/rewrite/rewriteManip.c +++ b/src/backend/rewrite/rewriteManip.c @@ -1198,6 +1198,119 @@ replace_rte_variables_mutator(Node *node, } +/* + * map_variable_attnos() finds all user-column Vars in an expression tree + * that reference a particular RTE, and adjusts their varattnos according + * to the given mapping array (varattno n is replaced by attno_map[n-1]). + * Vars for system columns are not modified. + * + * A zero in the mapping array represents a dropped column, which should not + * appear in the expression. + * + * If the expression tree contains a whole-row Var for the target RTE, + * the Var is not changed but *found_whole_row is returned as TRUE. + * For most callers this is an error condition, but we leave it to the caller + * to report the error so that useful context can be provided. (In some + * usages it would be appropriate to modify the Var's vartype and insert a + * ConvertRowtypeExpr node to map back to the original vartype. We might + * someday extend this function's API to support that. For now, the only + * concession to that future need is that this function is a tree mutator + * not just a walker.) + * + * This could be built using replace_rte_variables and a callback function, + * but since we don't ever need to insert sublinks, replace_rte_variables is + * overly complicated. + */ + +typedef struct +{ + int target_varno; /* RTE index to search for */ + int sublevels_up; /* (current) nesting depth */ + const AttrNumber *attno_map; /* map array for user attnos */ + int map_length; /* number of entries in attno_map[] */ + bool *found_whole_row; /* output flag */ +} map_variable_attnos_context; + +static Node * +map_variable_attnos_mutator(Node *node, + map_variable_attnos_context *context) +{ + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varno == context->target_varno && + var->varlevelsup == context->sublevels_up) + { + /* Found a matching variable, make the substitution */ + Var *newvar = (Var *) palloc(sizeof(Var)); + int attno = var->varattno; + + *newvar = *var; + if (attno > 0) + { + /* user-defined column, replace attno */ + if (attno > context->map_length || + context->attno_map[attno - 1] == 0) + elog(ERROR, "unexpected varattno %d in expression to be mapped", + attno); + newvar->varattno = newvar->varoattno = context->attno_map[attno - 1]; + } + else if (attno == 0) + { + /* whole-row variable, warn caller */ + *(context->found_whole_row) = true; + } + return (Node *) newvar; + } + /* otherwise fall through to copy the var normally */ + } + else if (IsA(node, Query)) + { + /* Recurse into RTE subquery or not-yet-planned sublink subquery */ + Query *newnode; + + context->sublevels_up++; + newnode = query_tree_mutator((Query *) node, + map_variable_attnos_mutator, + (void *) context, + 0); + context->sublevels_up--; + return (Node *) newnode; + } + return expression_tree_mutator(node, map_variable_attnos_mutator, + (void *) context); +} + +Node * +map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row) +{ + map_variable_attnos_context context; + + context.target_varno = target_varno; + context.sublevels_up = sublevels_up; + context.attno_map = attno_map; + context.map_length = map_length; + context.found_whole_row = found_whole_row; + + *found_whole_row = false; + + /* + * Must be prepared to start with a Query or a bare expression tree; if + * it's a Query, we don't want to increment sublevels_up. + */ + return query_or_expression_tree_mutator(node, + map_variable_attnos_mutator, + (void *) &context, + 0); +} + + /* * ResolveNew - replace Vars with corresponding items from a targetlist * diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index c1774a2b37..81d0a2927b 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -57,10 +57,6 @@ extern void find_composite_type_dependencies(Oid typeOid, const char *origTblName, const char *origTypeName); -extern AttrNumber *varattnos_map(TupleDesc olddesc, TupleDesc newdesc); -extern AttrNumber *varattnos_map_schema(TupleDesc old, List *schema); -extern void change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h index 0e48129d82..d22c7af613 100644 --- a/src/include/rewrite/rewriteManip.h +++ b/src/include/rewrite/rewriteManip.h @@ -65,6 +65,11 @@ extern Node *replace_rte_variables(Node *node, extern Node *replace_rte_variables_mutator(Node *node, replace_rte_variables_context *context); +extern Node *map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row); + extern Node *ResolveNew(Node *node, int target_varno, int sublevels_up, RangeTblEntry *target_rte, List *targetlist, int event, int update_varno, -- 2.40.0