]> granicus.if.org Git - postgresql/commitdiff
Make INSERT-from-multiple-VALUES-rows handle targetlist indirection better.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 3 Aug 2016 20:37:03 +0000 (16:37 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 3 Aug 2016 20:37:03 +0000 (16:37 -0400)
Previously, if an INSERT with multiple rows of VALUES had indirection
(array subscripting or field selection) in its target-columns list, the
parser handled that by applying transformAssignedExpr() to each element
of each VALUES row independently.  This led to having ArrayRef assignment
nodes or FieldStore nodes in each row of the VALUES RTE.  That works for
simple cases, but in bug #14265 Nuri Boardman points out that it fails
if there are multiple assignments to elements/fields of the same target
column.  For such cases to work, rewriteTargetListIU() has to nest the
ArrayRefs or FieldStores together to produce a single expression to be
assigned to the column.  But it failed to find them in the top-level
targetlist and issued an error about "multiple assignments to same column".

We could possibly fix this by teaching the rewriter to apply
rewriteTargetListIU to each VALUES row separately, but that would be messy
(it would change the output rowtype of the VALUES RTE, for example) and
inefficient.  Instead, let's fix the parser so that the VALUES RTE outputs
are just the user-specified values, cast to the right type if necessary,
and then the ArrayRefs or FieldStores are applied in the top-level
targetlist to Vars representing the RTE's outputs.  This is the same
parsetree representation already used for similar cases with INSERT/SELECT
syntax, so it allows simplifications in ruleutils.c, which no longer needs
to treat INSERT-from-multiple-VALUES as its own special case.

This implementation works by applying transformAssignedExpr to the VALUES
entries as before, and then stripping off any ArrayRefs or FieldStores it
adds.  With lots of VALUES rows it would be noticeably more efficient to
not add those nodes in the first place.  But that's just an optimization
not a bug fix, and there doesn't seem to be any good way to do it without
significant refactoring.  (A non-invasive answer would be to apply
transformAssignedExpr + stripping to just the first VALUES row, and then
just forcibly cast remaining rows to the same data types exposed in the
first row.  But this way would lead to different, not-INSERT-specific
errors being reported in casting failure cases, so it doesn't seem very
nice.)  So leave that for later; this patch at least isn't making the
per-row parsing work worse, and it does make the finished parsetree
smaller, saving rewriter and planner work.

Catversion bump because stored rules containing such INSERTs would need
to change.  Because of that, no back-patch, even though this is a very
long-standing bug.

Report: <20160727005725.7438.26021@wrigleys.postgresql.org>
Discussion: <9578.1469645245@sss.pgh.pa.us>

src/backend/parser/analyze.c
src/backend/utils/adt/ruleutils.c
src/include/catalog/catversion.h
src/test/regress/expected/insert.out
src/test/regress/sql/insert.sql

index 29c8c4e94c73ee88124657a88fe505905cbd0573..eac86cce3ee595077af91fe1dc15db0b10435899 100644 (file)
@@ -51,7 +51,8 @@ post_parse_analyze_hook_type post_parse_analyze_hook = NULL;
 static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt);
 static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt);
 static List *transformInsertRow(ParseState *pstate, List *exprlist,
-                                  List *stmtcols, List *icolumns, List *attrnos);
+                                  List *stmtcols, List *icolumns, List *attrnos,
+                                  bool strip_indirection);
 static OnConflictExpr *transformOnConflictClause(ParseState *pstate,
                                                  OnConflictClause *onConflictClause);
 static int     count_rowexpr_columns(ParseState *pstate, Node *expr);
@@ -619,7 +620,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
                /* Prepare row for assignment to target table */
                exprList = transformInsertRow(pstate, exprList,
                                                                          stmt->cols,
-                                                                         icolumns, attrnos);
+                                                                         icolumns, attrnos,
+                                                                         false);
        }
        else if (list_length(selectStmt->valuesLists) > 1)
        {
@@ -663,10 +665,20 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
                                                                                        exprLocation((Node *) sublist))));
                        }
 
-                       /* Prepare row for assignment to target table */
+                       /*
+                        * Prepare row for assignment to target table.  We process any
+                        * indirection on the target column specs normally but then strip
+                        * off the resulting field/array assignment nodes, since we don't
+                        * want the parsed statement to contain copies of those in each
+                        * VALUES row.  (It's annoying to have to transform the
+                        * indirection specs over and over like this, but avoiding it
+                        * would take some really messy refactoring of
+                        * transformAssignmentIndirection.)
+                        */
                        sublist = transformInsertRow(pstate, sublist,
                                                                                 stmt->cols,
-                                                                                icolumns, attrnos);
+                                                                                icolumns, attrnos,
+                                                                                true);
 
                        /*
                         * We must assign collations now because assign_query_collations
@@ -717,6 +729,14 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
                 * Generate list of Vars referencing the RTE
                 */
                expandRTE(rte, rtr->rtindex, 0, -1, false, NULL, &exprList);
+
+               /*
+                * Re-apply any indirection on the target column specs to the Vars
+                */
+               exprList = transformInsertRow(pstate, exprList,
+                                                                         stmt->cols,
+                                                                         icolumns, attrnos,
+                                                                         false);
        }
        else
        {
@@ -739,7 +759,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
                /* Prepare row for assignment to target table */
                exprList = transformInsertRow(pstate, exprList,
                                                                          stmt->cols,
-                                                                         icolumns, attrnos);
+                                                                         icolumns, attrnos,
+                                                                         false);
        }
 
        /*
@@ -808,12 +829,17 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
 /*
  * Prepare an INSERT row for assignment to the target table.
  *
- * The row might be either a VALUES row, or variables referencing a
- * sub-SELECT output.
+ * exprlist: transformed expressions for source values; these might come from
+ * a VALUES row, or be Vars referencing a sub-SELECT or VALUES RTE output.
+ * stmtcols: original target-columns spec for INSERT (we just test for NIL)
+ * icolumns: effective target-columns spec (list of ResTarget)
+ * attrnos: integer column numbers (must be same length as icolumns)
+ * strip_indirection: if true, remove any field/array assignment nodes
  */
 static List *
 transformInsertRow(ParseState *pstate, List *exprlist,
-                                  List *stmtcols, List *icolumns, List *attrnos)
+                                  List *stmtcols, List *icolumns, List *attrnos,
+                                  bool strip_indirection)
 {
        List       *result;
        ListCell   *lc;
@@ -879,6 +905,29 @@ transformInsertRow(ParseState *pstate, List *exprlist,
                                                                         col->indirection,
                                                                         col->location);
 
+               if (strip_indirection)
+               {
+                       while (expr)
+                       {
+                               if (IsA(expr, FieldStore))
+                               {
+                                       FieldStore *fstore = (FieldStore *) expr;
+
+                                       expr = (Expr *) linitial(fstore->newvals);
+                               }
+                               else if (IsA(expr, ArrayRef))
+                               {
+                                       ArrayRef   *aref = (ArrayRef *) expr;
+
+                                       if (aref->refassgnexpr == NULL)
+                                               break;
+                                       expr = aref->refassgnexpr;
+                               }
+                               else
+                                       break;
+                       }
+               }
+
                result = lappend(result, expr);
 
                icols = lnext(icols);
index 80f0def75d56122e6f6b28c5d8cc2aa6f321025c..d68ca7a7dc010de4494cbd6d5b600e297d91d527 100644 (file)
@@ -438,8 +438,7 @@ static void get_tablesample_def(TableSampleClause *tablesample,
                                        deparse_context *context);
 static void get_opclass_name(Oid opclass, Oid actual_datatype,
                                 StringInfo buf);
-static Node *processIndirection(Node *node, deparse_context *context,
-                                  bool printit);
+static Node *processIndirection(Node *node, deparse_context *context);
 static void printSubscripts(ArrayRef *aref, deparse_context *context);
 static char *get_relation_name(Oid relid);
 static char *generate_relation_name(Oid relid, List *namespaces);
@@ -4551,11 +4550,9 @@ get_values_def(List *values_lists, deparse_context *context)
                                appendStringInfoChar(buf, ',');
 
                        /*
-                        * Strip any top-level nodes representing indirection assignments,
-                        * then print the result.  Whole-row Vars need special treatment.
+                        * Print the value.  Whole-row Vars need special treatment.
                         */
-                       get_rule_expr_toplevel(processIndirection(col, context, false),
-                                                                  context, false);
+                       get_rule_expr_toplevel(col, context, false);
                }
                appendStringInfoChar(buf, ')');
        }
@@ -5512,7 +5509,6 @@ get_insert_query_def(Query *query, deparse_context *context)
        RangeTblEntry *values_rte = NULL;
        RangeTblEntry *rte;
        char       *sep;
-       ListCell   *values_cell;
        ListCell   *l;
        List       *strippedexprs;
 
@@ -5563,17 +5559,9 @@ get_insert_query_def(Query *query, deparse_context *context)
                                                 quote_identifier(rte->alias->aliasname));
 
        /*
-        * Add the insert-column-names list.  To handle indirection properly, we
-        * need to look for indirection nodes in the top targetlist (if it's
-        * INSERT ... SELECT or INSERT ... single VALUES), or in the first
-        * expression list of the VALUES RTE (if it's INSERT ... multi VALUES). We
-        * assume that all the expression lists will have similar indirection in
-        * the latter case.
+        * Add the insert-column-names list.  Any indirection decoration needed on
+        * the column names can be inferred from the top targetlist.
         */
-       if (values_rte)
-               values_cell = list_head((List *) linitial(values_rte->values_lists));
-       else
-               values_cell = NULL;
        strippedexprs = NIL;
        sep = "";
        if (query->targetList)
@@ -5599,20 +5587,14 @@ get_insert_query_def(Query *query, deparse_context *context)
                /*
                 * Print any indirection needed (subfields or subscripts), and strip
                 * off the top-level nodes representing the indirection assignments.
+                * Add the stripped expressions to strippedexprs.  (If it's a
+                * single-VALUES statement, the stripped expressions are the VALUES to
+                * print below.  Otherwise they're just Vars and not really
+                * interesting.)
                 */
-               if (values_cell)
-               {
-                       /* we discard the stripped expression in this case */
-                       processIndirection((Node *) lfirst(values_cell), context, true);
-                       values_cell = lnext(values_cell);
-               }
-               else
-               {
-                       /* we keep a list of the stripped expressions in this case */
-                       strippedexprs = lappend(strippedexprs,
-                                                                       processIndirection((Node *) tle->expr,
-                                                                                                          context, true));
-               }
+               strippedexprs = lappend(strippedexprs,
+                                                               processIndirection((Node *) tle->expr,
+                                                                                                  context));
        }
        if (query->targetList)
                appendStringInfoString(buf, ") ");
@@ -5891,7 +5873,7 @@ get_update_query_targetlist_def(Query *query, List *targetList,
                 * Print any indirection needed (subfields or subscripts), and strip
                 * off the top-level nodes representing the indirection assignments.
                 */
-               expr = processIndirection((Node *) tle->expr, context, true);
+               expr = processIndirection((Node *) tle->expr, context);
 
                /*
                 * If we're in a multiassignment, skip printing anything more, unless
@@ -7296,7 +7278,7 @@ get_rule_expr(Node *node, deparse_context *context,
                                         * subscripting in immediate descendants.  It returns the
                                         * RHS expr that is actually being "assigned".
                                         */
-                                       refassgnexpr = processIndirection(node, context, true);
+                                       refassgnexpr = processIndirection(node, context);
                                        appendStringInfoString(buf, " := ");
                                        get_rule_expr(refassgnexpr, context, showimplicit);
                                }
@@ -9561,12 +9543,12 @@ get_opclass_name(Oid opclass, Oid actual_datatype,
  * processIndirection - take care of array and subfield assignment
  *
  * We strip any top-level FieldStore or assignment ArrayRef nodes that
- * appear in the input, and return the subexpression that's to be assigned.
- * If printit is true, we also print out the appropriate decoration for the
- * base column name (that the caller just printed).
+ * appear in the input, printing them as decoration for the base column
+ * name (which we assume the caller just printed).  Return the subexpression
+ * that's to be assigned.
  */
 static Node *
-processIndirection(Node *node, deparse_context *context, bool printit)
+processIndirection(Node *node, deparse_context *context)
 {
        StringInfo      buf = context->buf;
 
@@ -9594,8 +9576,7 @@ processIndirection(Node *node, deparse_context *context, bool printit)
                        Assert(list_length(fstore->fieldnums) == 1);
                        fieldname = get_relid_attribute_name(typrelid,
                                                                                        linitial_int(fstore->fieldnums));
-                       if (printit)
-                               appendStringInfo(buf, ".%s", quote_identifier(fieldname));
+                       appendStringInfo(buf, ".%s", quote_identifier(fieldname));
 
                        /*
                         * We ignore arg since it should be an uninteresting reference to
@@ -9609,8 +9590,7 @@ processIndirection(Node *node, deparse_context *context, bool printit)
 
                        if (aref->refassgnexpr == NULL)
                                break;
-                       if (printit)
-                               printSubscripts(aref, context);
+                       printSubscripts(aref, context);
 
                        /*
                         * We ignore refexpr since it should be an uninteresting reference
index c3608cfba5bb192a1360a5c170c16660654c7df6..25e26dbbf6d9c0e167f0878ba0c3720ade1fba32 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201608021
+#define CATALOG_VERSION_NO     201608031
 
 #endif
index 96c7f9e430aa9a096c364ef74eff9e2d6cf3502a..70107b5bf27b1ad8fbb3163f0de8fd4afa142af8 100644 (file)
@@ -81,3 +81,82 @@ select col1, col2, char_length(col3) from inserttest;
 (8 rows)
 
 drop table inserttest;
+--
+-- check indirection (field/array assignment), cf bug #14265
+--
+-- these tests are aware that transformInsertStmt has 3 separate code paths
+--
+create type insert_test_type as (if1 int, if2 text[]);
+create table inserttest (f1 int, f2 int[],
+                         f3 insert_test_type, f4 insert_test_type[]);
+insert into inserttest (f2[1], f2[2]) values (1,2);
+insert into inserttest (f2[1], f2[2]) values (3,4), (5,6);
+insert into inserttest (f2[1], f2[2]) select 7,8;
+insert into inserttest (f2[1], f2[2]) values (1,default);  -- not supported
+ERROR:  cannot set an array element to DEFAULT
+LINE 1: insert into inserttest (f2[1], f2[2]) values (1,default);
+                                       ^
+insert into inserttest (f3.if1, f3.if2) values (1,array['foo']);
+insert into inserttest (f3.if1, f3.if2) values (1,'{foo}'), (2,'{bar}');
+insert into inserttest (f3.if1, f3.if2) select 3, '{baz,quux}';
+insert into inserttest (f3.if1, f3.if2) values (1,default);  -- not supported
+ERROR:  cannot set a subfield to DEFAULT
+LINE 1: insert into inserttest (f3.if1, f3.if2) values (1,default);
+                                        ^
+insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar');
+insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar'), ('baz', 'quux');
+insert into inserttest (f3.if2[1], f3.if2[2]) select 'bear', 'beer';
+insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar');
+insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar'), ('baz', 'quux');
+insert into inserttest (f4[1].if2[1], f4[1].if2[2]) select 'bear', 'beer';
+select * from inserttest;
+ f1 |  f2   |        f3        |           f4           
+----+-------+------------------+------------------------
+    | {1,2} |                  | 
+    | {3,4} |                  | 
+    | {5,6} |                  | 
+    | {7,8} |                  | 
+    |       | (1,{foo})        | 
+    |       | (1,{foo})        | 
+    |       | (2,{bar})        | 
+    |       | (3,"{baz,quux}") | 
+    |       | (,"{foo,bar}")   | 
+    |       | (,"{foo,bar}")   | 
+    |       | (,"{baz,quux}")  | 
+    |       | (,"{bear,beer}") | 
+    |       |                  | {"(,\"{foo,bar}\")"}
+    |       |                  | {"(,\"{foo,bar}\")"}
+    |       |                  | {"(,\"{baz,quux}\")"}
+    |       |                  | {"(,\"{bear,beer}\")"}
+(16 rows)
+
+-- also check reverse-listing
+create table inserttest2 (f1 bigint, f2 text);
+create rule irule1 as on insert to inserttest2 do also
+  insert into inserttest (f3.if2[1], f3.if2[2])
+  values (new.f1,new.f2);
+create rule irule2 as on insert to inserttest2 do also
+  insert into inserttest (f4[1].if1, f4[1].if2[2])
+  values (1,'fool'),(new.f1,new.f2);
+create rule irule3 as on insert to inserttest2 do also
+  insert into inserttest (f4[1].if1, f4[1].if2[2])
+  select new.f1, new.f2;
+\d+ inserttest2
+                     Table "public.inserttest2"
+ Column |  Type  | Modifiers | Storage  | Stats target | Description 
+--------+--------+-----------+----------+--------------+-------------
+ f1     | bigint |           | plain    |              | 
+ f2     | text   |           | extended |              | 
+Rules:
+    irule1 AS
+    ON INSERT TO inserttest2 DO  INSERT INTO inserttest (f3.if2[1], f3.if2[2])
+  VALUES (new.f1, new.f2)
+    irule2 AS
+    ON INSERT TO inserttest2 DO  INSERT INTO inserttest (f4[1].if1, f4[1].if2[2]) VALUES (1,'fool'::text), (new.f1,new.f2)
+    irule3 AS
+    ON INSERT TO inserttest2 DO  INSERT INTO inserttest (f4[1].if1, f4[1].if2[2])  SELECT new.f1,
+            new.f2
+
+drop table inserttest2;
+drop table inserttest;
+drop type insert_test_type;
index a0ae85003fbec1e33c5ac37daa4a25da2c2cf50c..7924d5d46deeddb1325e865d4888f3446aa5dbd9 100644 (file)
@@ -36,3 +36,51 @@ insert into inserttest values(30, 50, repeat('x', 10000));
 select col1, col2, char_length(col3) from inserttest;
 
 drop table inserttest;
+
+--
+-- check indirection (field/array assignment), cf bug #14265
+--
+-- these tests are aware that transformInsertStmt has 3 separate code paths
+--
+
+create type insert_test_type as (if1 int, if2 text[]);
+
+create table inserttest (f1 int, f2 int[],
+                         f3 insert_test_type, f4 insert_test_type[]);
+
+insert into inserttest (f2[1], f2[2]) values (1,2);
+insert into inserttest (f2[1], f2[2]) values (3,4), (5,6);
+insert into inserttest (f2[1], f2[2]) select 7,8;
+insert into inserttest (f2[1], f2[2]) values (1,default);  -- not supported
+
+insert into inserttest (f3.if1, f3.if2) values (1,array['foo']);
+insert into inserttest (f3.if1, f3.if2) values (1,'{foo}'), (2,'{bar}');
+insert into inserttest (f3.if1, f3.if2) select 3, '{baz,quux}';
+insert into inserttest (f3.if1, f3.if2) values (1,default);  -- not supported
+
+insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar');
+insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar'), ('baz', 'quux');
+insert into inserttest (f3.if2[1], f3.if2[2]) select 'bear', 'beer';
+
+insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar');
+insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar'), ('baz', 'quux');
+insert into inserttest (f4[1].if2[1], f4[1].if2[2]) select 'bear', 'beer';
+
+select * from inserttest;
+
+-- also check reverse-listing
+create table inserttest2 (f1 bigint, f2 text);
+create rule irule1 as on insert to inserttest2 do also
+  insert into inserttest (f3.if2[1], f3.if2[2])
+  values (new.f1,new.f2);
+create rule irule2 as on insert to inserttest2 do also
+  insert into inserttest (f4[1].if1, f4[1].if2[2])
+  values (1,'fool'),(new.f1,new.f2);
+create rule irule3 as on insert to inserttest2 do also
+  insert into inserttest (f4[1].if1, f4[1].if2[2])
+  select new.f1, new.f2;
+\d+ inserttest2
+
+drop table inserttest2;
+drop table inserttest;
+drop type insert_test_type;