]> granicus.if.org Git - postgresql/commitdiff
postgres_fdw: Push down UPDATE/DELETE joins to remote servers.
authorRobert Haas <rhaas@postgresql.org>
Wed, 7 Feb 2018 20:34:30 +0000 (15:34 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 7 Feb 2018 20:34:30 +0000 (15:34 -0500)
Commit 0bf3ae88af330496517722e391e7c975e6bad219 allowed direct
foreign table modification; instead of fetching each row, updating
it locally, and then pushing the modification back to the remote
side, we would instead do all the work on the remote server via a
single remote UPDATE or DELETE command.  However, that commit only
enabled this optimization when join tree consisted only of the
target table.

This change allows the same optimization when an UPDATE statement
has a FROM clause or a DELETE statement has a USING clause.  This
works much like ordinary foreign join pushdown, in that the tables
must be on the same remote server, relevant parts of the query
must be pushdown-safe, and so forth.

Etsuro Fujita, reviewed by Ashutosh Bapat, Rushabh Lathia, and me.
Some formatting corrections by me.

Discussion: http://postgr.es/m/5A57193A.2080003@lab.ntt.co.jp
Discussion: http://postgr.es/m/b9cee735-62f8-6c07-7528-6364ce9347d0@lab.ntt.co.jp

contrib/postgres_fdw/deparse.c
contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/postgres_fdw.h
contrib/postgres_fdw/sql/postgres_fdw.sql

index e111b09c7ccf1aa72bb6469f769e5def6930d5b3..32c7261dae587276cfc8a6d7c4543a9d87ef118b 100644 (file)
@@ -132,7 +132,9 @@ static void deparseTargetList(StringInfo buf,
                                  Bitmapset *attrs_used,
                                  bool qualify_col,
                                  List **retrieved_attrs);
-static void deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
+static void deparseExplicitTargetList(List *tlist,
+                                                 bool is_returning,
+                                                 List **retrieved_attrs,
                                                  deparse_expr_cxt *context);
 static void deparseSubqueryTargetList(deparse_expr_cxt *context);
 static void deparseReturningList(StringInfo buf, PlannerInfo *root,
@@ -168,11 +170,13 @@ static void deparseLockingClause(deparse_expr_cxt *context);
 static void appendOrderByClause(List *pathkeys, deparse_expr_cxt *context);
 static void appendConditions(List *exprs, deparse_expr_cxt *context);
 static void deparseFromExprForRel(StringInfo buf, PlannerInfo *root,
-                                         RelOptInfo *joinrel, bool use_alias, List **params_list);
+                                         RelOptInfo *foreignrel, bool use_alias,
+                                         Index ignore_rel, List **ignore_conds,
+                                         List **params_list);
 static void deparseFromExpr(List *quals, deparse_expr_cxt *context);
 static void deparseRangeTblRef(StringInfo buf, PlannerInfo *root,
                                   RelOptInfo *foreignrel, bool make_subquery,
-                                  List **params_list);
+                                  Index ignore_rel, List **ignore_conds, List **params_list);
 static void deparseAggref(Aggref *node, deparse_expr_cxt *context);
 static void appendGroupByClause(List *tlist, deparse_expr_cxt *context);
 static void appendAggOrderBy(List *orderList, List *targetList,
@@ -1028,7 +1032,7 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs,
                 * For a join or upper relation the input tlist gives the list of
                 * columns required to be fetched from the foreign server.
                 */
-               deparseExplicitTargetList(tlist, retrieved_attrs, context);
+               deparseExplicitTargetList(tlist, false, retrieved_attrs, context);
        }
        else
        {
@@ -1071,7 +1075,7 @@ deparseFromExpr(List *quals, deparse_expr_cxt *context)
        appendStringInfoString(buf, " FROM ");
        deparseFromExprForRel(buf, context->root, scanrel,
                                                  (bms_num_members(scanrel->relids) > 1),
-                                                 context->params_list);
+                                                 (Index) 0, NULL, context->params_list);
 
        /* Construct WHERE clause */
        if (quals != NIL)
@@ -1340,9 +1344,14 @@ get_jointype_name(JoinType jointype)
  *
  * retrieved_attrs is the list of continuously increasing integers starting
  * from 1. It has same number of entries as tlist.
+ *
+ * This is used for both SELECT and RETURNING targetlists; the is_returning
+ * parameter is true only for a RETURNING targetlist.
  */
 static void
-deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
+deparseExplicitTargetList(List *tlist,
+                                                 bool is_returning,
+                                                 List **retrieved_attrs,
                                                  deparse_expr_cxt *context)
 {
        ListCell   *lc;
@@ -1357,13 +1366,16 @@ deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
 
                if (i > 0)
                        appendStringInfoString(buf, ", ");
+               else if (is_returning)
+                       appendStringInfoString(buf, " RETURNING ");
+
                deparseExpr((Expr *) tle->expr, context);
 
                *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
                i++;
        }
 
-       if (i == 0)
+       if (i == 0 && !is_returning)
                appendStringInfoString(buf, "NULL");
 }
 
@@ -1406,10 +1418,17 @@ deparseSubqueryTargetList(deparse_expr_cxt *context)
  * The function constructs ... JOIN ... ON ... for join relation. For a base
  * relation it just returns schema-qualified tablename, with the appropriate
  * alias if so requested.
+ *
+ * 'ignore_rel' is either zero or the RT index of a target relation.  In the
+ * latter case the function constructs FROM clause of UPDATE or USING clause
+ * of DELETE; it deparses the join relation as if the relation never contained
+ * the target relation, and creates a List of conditions to be deparsed into
+ * the top-level WHERE clause, which is returned to *ignore_conds.
  */
 static void
 deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
-                                         bool use_alias, List **params_list)
+                                         bool use_alias, Index ignore_rel, List **ignore_conds,
+                                         List **params_list)
 {
        PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
 
@@ -1417,16 +1436,89 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
        {
                StringInfoData join_sql_o;
                StringInfoData join_sql_i;
+               RelOptInfo *outerrel = fpinfo->outerrel;
+               RelOptInfo *innerrel = fpinfo->innerrel;
+               bool            outerrel_is_target = false;
+               bool            innerrel_is_target = false;
 
-               /* Deparse outer relation */
-               initStringInfo(&join_sql_o);
-               deparseRangeTblRef(&join_sql_o, root, fpinfo->outerrel,
-                                                  fpinfo->make_outerrel_subquery, params_list);
+               if (ignore_rel > 0 && bms_is_member(ignore_rel, foreignrel->relids))
+               {
+                       /*
+                        * If this is an inner join, add joinclauses to *ignore_conds and
+                        * set it to empty so that those can be deparsed into the WHERE
+                        * clause.  Note that since the target relation can never be
+                        * within the nullable side of an outer join, those could safely
+                        * be pulled up into the WHERE clause (see foreign_join_ok()).
+                        * Note also that since the target relation is only inner-joined
+                        * to any other relation in the query, all conditions in the join
+                        * tree mentioning the target relation could be deparsed into the
+                        * WHERE clause by doing this recursively.
+                        */
+                       if (fpinfo->jointype == JOIN_INNER)
+                       {
+                               *ignore_conds = list_concat(*ignore_conds,
+                                                                                       list_copy(fpinfo->joinclauses));
+                               fpinfo->joinclauses = NIL;
+                       }
 
-               /* Deparse inner relation */
-               initStringInfo(&join_sql_i);
-               deparseRangeTblRef(&join_sql_i, root, fpinfo->innerrel,
-                                                  fpinfo->make_innerrel_subquery, params_list);
+                       /*
+                        * Check if either of the input relations is the target relation.
+                        */
+                       if (outerrel->relid == ignore_rel)
+                               outerrel_is_target = true;
+                       else if (innerrel->relid == ignore_rel)
+                               innerrel_is_target = true;
+               }
+
+               /* Deparse outer relation if not the target relation. */
+               if (!outerrel_is_target)
+               {
+                       initStringInfo(&join_sql_o);
+                       deparseRangeTblRef(&join_sql_o, root, outerrel,
+                                                          fpinfo->make_outerrel_subquery,
+                                                          ignore_rel, ignore_conds, params_list);
+
+                       /*
+                        * If inner relation is the target relation, skip deparsing it.
+                        * Note that since the join of the target relation with any other
+                        * relation in the query is an inner join and can never be within
+                        * the nullable side of an outer join, the join could be
+                        * interchanged with higher-level joins (cf. identity 1 on outer
+                        * join reordering shown in src/backend/optimizer/README), which
+                        * means it's safe to skip the target-relation deparsing here.
+                        */
+                       if (innerrel_is_target)
+                       {
+                               Assert(fpinfo->jointype == JOIN_INNER);
+                               Assert(fpinfo->joinclauses == NIL);
+                               appendStringInfo(buf, "%s", join_sql_o.data);
+                               return;
+                       }
+               }
+
+               /* Deparse inner relation if not the target relation. */
+               if (!innerrel_is_target)
+               {
+                       initStringInfo(&join_sql_i);
+                       deparseRangeTblRef(&join_sql_i, root, innerrel,
+                                                          fpinfo->make_innerrel_subquery,
+                                                          ignore_rel, ignore_conds, params_list);
+
+                       /*
+                        * If outer relation is the target relation, skip deparsing it.
+                        * See the above note about safety.
+                        */
+                       if (outerrel_is_target)
+                       {
+                               Assert(fpinfo->jointype == JOIN_INNER);
+                               Assert(fpinfo->joinclauses == NIL);
+                               appendStringInfo(buf, "%s", join_sql_i.data);
+                               return;
+                       }
+               }
+
+               /* Neither of the relations is the target relation. */
+               Assert(!outerrel_is_target && !innerrel_is_target);
 
                /*
                 * For a join relation FROM clause entry is deparsed as
@@ -1486,7 +1578,8 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
  */
 static void
 deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
-                                  bool make_subquery, List **params_list)
+                                  bool make_subquery, Index ignore_rel, List **ignore_conds,
+                                  List **params_list)
 {
        PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
 
@@ -1501,6 +1594,14 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
                List       *retrieved_attrs;
                int                     ncols;
 
+               /*
+                * The given relation shouldn't contain the target relation, because
+                * this should only happen for input relations for a full join, and
+                * such relations can never contain an UPDATE/DELETE target.
+                */
+               Assert(ignore_rel == 0 ||
+                          !bms_is_member(ignore_rel, foreignrel->relids));
+
                /* Deparse the subquery representing the relation. */
                appendStringInfoChar(buf, '(');
                deparseSelectStmtForRel(buf, root, foreignrel, NIL,
@@ -1534,7 +1635,8 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
                }
        }
        else
-               deparseFromExprForRel(buf, root, foreignrel, true, params_list);
+               deparseFromExprForRel(buf, root, foreignrel, true, ignore_rel,
+                                                         ignore_conds, params_list);
 }
 
 /*
@@ -1645,13 +1747,23 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root,
 /*
  * deparse remote UPDATE statement
  *
- * The statement text is appended to buf, and we also create an integer List
- * of the columns being retrieved by RETURNING (if any), which is returned
- * to *retrieved_attrs.
+ * 'buf' is the output buffer to append the statement to
+ * 'rtindex' is the RT index of the associated target relation
+ * 'rel' is the relation descriptor for the target relation
+ * 'foreignrel' is the RelOptInfo for the target relation or the join relation
+ *             containing all base relations in the query
+ * 'targetlist' is the tlist of the underlying foreign-scan plan node
+ * 'targetAttrs' is the target columns of the UPDATE
+ * 'remote_conds' is the qual clauses that must be evaluated remotely
+ * '*params_list' is an output list of exprs that will become remote Params
+ * 'returningList' is the RETURNING targetlist
+ * '*retrieved_attrs' is an output list of integers of columns being retrieved
+ *             by RETURNING (if any)
  */
 void
 deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
                                           Index rtindex, Relation rel,
+                                          RelOptInfo *foreignrel,
                                           List *targetlist,
                                           List *targetAttrs,
                                           List *remote_conds,
@@ -1659,7 +1771,6 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
                                           List *returningList,
                                           List **retrieved_attrs)
 {
-       RelOptInfo *baserel = root->simple_rel_array[rtindex];
        deparse_expr_cxt context;
        int                     nestlevel;
        bool            first;
@@ -1667,13 +1778,15 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
 
        /* Set up context struct for recursion */
        context.root = root;
-       context.foreignrel = baserel;
-       context.scanrel = baserel;
+       context.foreignrel = foreignrel;
+       context.scanrel = foreignrel;
        context.buf = buf;
        context.params_list = params_list;
 
        appendStringInfoString(buf, "UPDATE ");
        deparseRelation(buf, rel);
+       if (foreignrel->reloptkind == RELOPT_JOINREL)
+               appendStringInfo(buf, " %s%d", REL_ALIAS_PREFIX, rtindex);
        appendStringInfoString(buf, " SET ");
 
        /* Make sure any constants in the exprs are printed portably */
@@ -1700,14 +1813,28 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
 
        reset_transmission_modes(nestlevel);
 
+       if (foreignrel->reloptkind == RELOPT_JOINREL)
+       {
+               List       *ignore_conds = NIL;
+
+               appendStringInfo(buf, " FROM ");
+               deparseFromExprForRel(buf, root, foreignrel, true, rtindex,
+                                                         &ignore_conds, params_list);
+               remote_conds = list_concat(remote_conds, ignore_conds);
+       }
+
        if (remote_conds)
        {
                appendStringInfoString(buf, " WHERE ");
                appendConditions(remote_conds, &context);
        }
 
-       deparseReturningList(buf, root, rtindex, rel, false,
-                                                returningList, retrieved_attrs);
+       if (foreignrel->reloptkind == RELOPT_JOINREL)
+               deparseExplicitTargetList(returningList, true, retrieved_attrs,
+                                                                 &context);
+       else
+               deparseReturningList(buf, root, rtindex, rel, false,
+                                                        returningList, retrieved_attrs);
 }
 
 /*
@@ -1735,30 +1862,49 @@ deparseDeleteSql(StringInfo buf, PlannerInfo *root,
 /*
  * deparse remote DELETE statement
  *
- * The statement text is appended to buf, and we also create an integer List
- * of the columns being retrieved by RETURNING (if any), which is returned
- * to *retrieved_attrs.
+ * 'buf' is the output buffer to append the statement to
+ * 'rtindex' is the RT index of the associated target relation
+ * 'rel' is the relation descriptor for the target relation
+ * 'foreignrel' is the RelOptInfo for the target relation or the join relation
+ *             containing all base relations in the query
+ * 'remote_conds' is the qual clauses that must be evaluated remotely
+ * '*params_list' is an output list of exprs that will become remote Params
+ * 'returningList' is the RETURNING targetlist
+ * '*retrieved_attrs' is an output list of integers of columns being retrieved
+ *             by RETURNING (if any)
  */
 void
 deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
                                           Index rtindex, Relation rel,
+                                          RelOptInfo *foreignrel,
                                           List *remote_conds,
                                           List **params_list,
                                           List *returningList,
                                           List **retrieved_attrs)
 {
-       RelOptInfo *baserel = root->simple_rel_array[rtindex];
        deparse_expr_cxt context;
 
        /* Set up context struct for recursion */
        context.root = root;
-       context.foreignrel = baserel;
-       context.scanrel = baserel;
+       context.foreignrel = foreignrel;
+       context.scanrel = foreignrel;
        context.buf = buf;
        context.params_list = params_list;
 
        appendStringInfoString(buf, "DELETE FROM ");
        deparseRelation(buf, rel);
+       if (foreignrel->reloptkind == RELOPT_JOINREL)
+               appendStringInfo(buf, " %s%d", REL_ALIAS_PREFIX, rtindex);
+
+       if (foreignrel->reloptkind == RELOPT_JOINREL)
+       {
+               List       *ignore_conds = NIL;
+
+               appendStringInfo(buf, " USING ");
+               deparseFromExprForRel(buf, root, foreignrel, true, rtindex,
+                                                         &ignore_conds, params_list);
+               remote_conds = list_concat(remote_conds, ignore_conds);
+       }
 
        if (remote_conds)
        {
@@ -1766,8 +1912,12 @@ deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
                appendConditions(remote_conds, &context);
        }
 
-       deparseReturningList(buf, root, rtindex, rel, false,
-                                                returningList, retrieved_attrs);
+       if (foreignrel->reloptkind == RELOPT_JOINREL)
+               deparseExplicitTargetList(returningList, true, retrieved_attrs,
+                                                                 &context);
+       else
+               deparseReturningList(buf, root, rtindex, rel, false,
+                                                        returningList, retrieved_attrs);
 }
 
 /*
index 5e1f44041c044f5a142f62455b9dceebca8f4ba9..885a45b0df37d89c67dd19a9d7d5995e40adfca0 100644 (file)
@@ -4399,27 +4399,13 @@ UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING
 
 EXPLAIN (verbose, costs off)
 UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
-  FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;                               -- can't be pushed down
-                                                                                                                                                        QUERY PLAN                                                                                                                                                         
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+  FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;                               -- can be pushed down
+                                                                                                   QUERY PLAN                                                                                                    
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  Update on public.ft2
-   Remote SQL: UPDATE "S 1"."T 1" SET c2 = $2, c3 = $3, c7 = $4 WHERE ctid = $1
-   ->  Foreign Scan
-         Output: ft2.c1, (ft2.c2 + 500), NULL::integer, (ft2.c3 || '_update9'::text), ft2.c4, ft2.c5, ft2.c6, 'ft2       '::character(10), ft2.c8, ft2.ctid, ft1.*
-         Relations: (public.ft2) INNER JOIN (public.ft1)
-         Remote SQL: SELECT r1."C 1", r1.c2, r1.c3, r1.c4, r1.c5, r1.c6, r1.c8, r1.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2."C 1", r2.c2, r2.c3, r2.c4, r2.c5, r2.c6, r2.c7, r2.c8) END FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 9)))) FOR UPDATE OF r1
-         ->  Hash Join
-               Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c8, ft2.ctid, ft1.*
-               Hash Cond: (ft2.c2 = ft1.c1)
-               ->  Foreign Scan on public.ft2
-                     Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c8, ft2.ctid
-                     Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c8, ctid FROM "S 1"."T 1" FOR UPDATE
-               ->  Hash
-                     Output: ft1.*, ft1.c1
-                     ->  Foreign Scan on public.ft1
-                           Output: ft1.*, ft1.c1
-                           Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 9))
-(17 rows)
+   ->  Foreign Update
+         Remote SQL: UPDATE "S 1"."T 1" r1 SET c2 = (r1.c2 + 500), c3 = (r1.c3 || '_update9'::text), c7 = 'ft2       '::character(10) FROM "S 1"."T 1" r2 WHERE ((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 9))
+(3 rows)
 
 UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
   FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
@@ -4542,27 +4528,13 @@ DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
 (103 rows)
 
 EXPLAIN (verbose, costs off)
-DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;                -- can't be pushed down
-                                                                                                                              QUERY PLAN                                                                                                                               
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;                -- can be pushed down
+                                                         QUERY PLAN                                                         
+----------------------------------------------------------------------------------------------------------------------------
  Delete on public.ft2
-   Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1
-   ->  Foreign Scan
-         Output: ft2.ctid, ft1.*
-         Relations: (public.ft2) INNER JOIN (public.ft1)
-         Remote SQL: SELECT r1.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2."C 1", r2.c2, r2.c3, r2.c4, r2.c5, r2.c6, r2.c7, r2.c8) END FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 2)))) FOR UPDATE OF r1
-         ->  Hash Join
-               Output: ft2.ctid, ft1.*
-               Hash Cond: (ft2.c2 = ft1.c1)
-               ->  Foreign Scan on public.ft2
-                     Output: ft2.ctid, ft2.c2
-                     Remote SQL: SELECT c2, ctid FROM "S 1"."T 1" FOR UPDATE
-               ->  Hash
-                     Output: ft1.*, ft1.c1
-                     ->  Foreign Scan on public.ft1
-                           Output: ft1.*, ft1.c1
-                           Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 2))
-(17 rows)
+   ->  Foreign Delete
+         Remote SQL: DELETE FROM "S 1"."T 1" r1 USING "S 1"."T 1" r2 WHERE ((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 2))
+(3 rows)
 
 DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
 SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1;
@@ -5438,6 +5410,195 @@ DELETE FROM ft2 WHERE c1 = 9999 RETURNING tableoid::regclass;
  ft2
 (1 row)
 
+-- Test UPDATE/DELETE with RETURNING on a three-table join
+INSERT INTO ft2 (c1,c2,c3)
+  SELECT id, id - 1200, to_char(id, 'FM00000') FROM generate_series(1201, 1300) id;
+EXPLAIN (verbose, costs off)
+UPDATE ft2 SET c3 = 'foo'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*;                             -- can be pushed down
+                                                                                                                                                                                   QUERY PLAN                                                                                                                                                                                    
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Update on public.ft2
+   Output: ft2.ctid, ft2.*, ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft4.ctid, ft4.*, ft4.c1, ft4.c2, ft4.c3
+   ->  Foreign Update
+         Remote SQL: UPDATE "S 1"."T 1" r1 SET c3 = 'foo'::text FROM ("S 1"."T 3" r2 INNER JOIN "S 1"."T 4" r3 ON (TRUE)) WHERE ((r2.c1 = r3.c1)) AND ((r1.c2 = r2.c1)) AND ((r1."C 1" > 1200)) RETURNING r1."C 1", r1.c2, r1.c3, r1.c4, r1.c5, r1.c6, r1.c7, r1.c8, r1.ctid, r2.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.c1, r2.c2, r2.c3) END, r2.c1, r2.c2, r2.c3
+(4 rows)
+
+UPDATE ft2 SET c3 = 'foo'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*;
+   ctid   |              ft2               |  c1  | c2 | c3  | c4 | c5 | c6 |     c7     | c8 |  ctid  |      ft4       | c1 | c2 |   c3   
+----------+--------------------------------+------+----+-----+----+----+----+------------+----+--------+----------------+----+----+--------
+ (12,102) | (1206,6,foo,,,,"ft2       ",)  | 1206 |  6 | foo |    |    |    | ft2        |    | (0,6)  | (6,7,AAA006)   |  6 |  7 | AAA006
+ (12,103) | (1212,12,foo,,,,"ft2       ",) | 1212 | 12 | foo |    |    |    | ft2        |    | (0,12) | (12,13,AAA012) | 12 | 13 | AAA012
+ (12,104) | (1218,18,foo,,,,"ft2       ",) | 1218 | 18 | foo |    |    |    | ft2        |    | (0,18) | (18,19,AAA018) | 18 | 19 | AAA018
+ (12,105) | (1224,24,foo,,,,"ft2       ",) | 1224 | 24 | foo |    |    |    | ft2        |    | (0,24) | (24,25,AAA024) | 24 | 25 | AAA024
+ (12,106) | (1230,30,foo,,,,"ft2       ",) | 1230 | 30 | foo |    |    |    | ft2        |    | (0,30) | (30,31,AAA030) | 30 | 31 | AAA030
+ (12,107) | (1236,36,foo,,,,"ft2       ",) | 1236 | 36 | foo |    |    |    | ft2        |    | (0,36) | (36,37,AAA036) | 36 | 37 | AAA036
+ (12,108) | (1242,42,foo,,,,"ft2       ",) | 1242 | 42 | foo |    |    |    | ft2        |    | (0,42) | (42,43,AAA042) | 42 | 43 | AAA042
+ (12,109) | (1248,48,foo,,,,"ft2       ",) | 1248 | 48 | foo |    |    |    | ft2        |    | (0,48) | (48,49,AAA048) | 48 | 49 | AAA048
+ (12,110) | (1254,54,foo,,,,"ft2       ",) | 1254 | 54 | foo |    |    |    | ft2        |    | (0,54) | (54,55,AAA054) | 54 | 55 | AAA054
+ (12,111) | (1260,60,foo,,,,"ft2       ",) | 1260 | 60 | foo |    |    |    | ft2        |    | (0,60) | (60,61,AAA060) | 60 | 61 | AAA060
+ (12,112) | (1266,66,foo,,,,"ft2       ",) | 1266 | 66 | foo |    |    |    | ft2        |    | (0,66) | (66,67,AAA066) | 66 | 67 | AAA066
+ (12,113) | (1272,72,foo,,,,"ft2       ",) | 1272 | 72 | foo |    |    |    | ft2        |    | (0,72) | (72,73,AAA072) | 72 | 73 | AAA072
+ (12,114) | (1278,78,foo,,,,"ft2       ",) | 1278 | 78 | foo |    |    |    | ft2        |    | (0,78) | (78,79,AAA078) | 78 | 79 | AAA078
+ (12,115) | (1284,84,foo,,,,"ft2       ",) | 1284 | 84 | foo |    |    |    | ft2        |    | (0,84) | (84,85,AAA084) | 84 | 85 | AAA084
+ (12,116) | (1290,90,foo,,,,"ft2       ",) | 1290 | 90 | foo |    |    |    | ft2        |    | (0,90) | (90,91,AAA090) | 90 | 91 | AAA090
+ (12,117) | (1296,96,foo,,,,"ft2       ",) | 1296 | 96 | foo |    |    |    | ft2        |    | (0,96) | (96,97,AAA096) | 96 | 97 | AAA096
+(16 rows)
+
+EXPLAIN (verbose, costs off)
+DELETE FROM ft2
+  USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
+  RETURNING 100;                                                                    -- can be pushed down
+                                                                                            QUERY PLAN                                                                                             
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Delete on public.ft2
+   Output: 100
+   ->  Foreign Delete
+         Remote SQL: DELETE FROM "S 1"."T 1" r1 USING ("S 1"."T 3" r2 LEFT JOIN "S 1"."T 4" r3 ON (((r2.c1 = r3.c1)))) WHERE ((r1.c2 = r2.c1)) AND ((r1."C 1" > 1200)) AND (((r1."C 1" % 10) = 0))
+(4 rows)
+
+DELETE FROM ft2
+  USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
+  RETURNING 100;
+ ?column? 
+----------
+      100
+      100
+      100
+      100
+      100
+      100
+      100
+      100
+      100
+      100
+(10 rows)
+
+DELETE FROM ft2 WHERE ft2.c1 > 1200;
+-- Test UPDATE/DELETE with WHERE or JOIN/ON conditions containing
+-- user-defined operators/functions
+ALTER SERVER loopback OPTIONS (DROP extensions);
+INSERT INTO ft2 (c1,c2,c3)
+  SELECT id, id % 10, to_char(id, 'FM00000') FROM generate_series(2001, 2010) id;
+EXPLAIN (verbose, costs off)
+UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;            -- can't be pushed down
+                                                QUERY PLAN                                                
+----------------------------------------------------------------------------------------------------------
+ Update on public.ft2
+   Output: c1, c2, c3, c4, c5, c6, c7, c8
+   Remote SQL: UPDATE "S 1"."T 1" SET c3 = $2 WHERE ctid = $1 RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8
+   ->  Foreign Scan on public.ft2
+         Output: c1, c2, NULL::integer, 'bar'::text, c4, c5, c6, c7, c8, ctid
+         Filter: (postgres_fdw_abs(ft2.c1) > 2000)
+         Remote SQL: SELECT "C 1", c2, c4, c5, c6, c7, c8, ctid FROM "S 1"."T 1" FOR UPDATE
+(7 rows)
+
+UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;
+  c1  | c2 | c3  | c4 | c5 | c6 |     c7     | c8 
+------+----+-----+----+----+----+------------+----
+ 2001 |  1 | bar |    |    |    | ft2        | 
+ 2002 |  2 | bar |    |    |    | ft2        | 
+ 2003 |  3 | bar |    |    |    | ft2        | 
+ 2004 |  4 | bar |    |    |    | ft2        | 
+ 2005 |  5 | bar |    |    |    | ft2        | 
+ 2006 |  6 | bar |    |    |    | ft2        | 
+ 2007 |  7 | bar |    |    |    | ft2        | 
+ 2008 |  8 | bar |    |    |    | ft2        | 
+ 2009 |  9 | bar |    |    |    | ft2        | 
+ 2010 |  0 | bar |    |    |    | ft2        | 
+(10 rows)
+
+EXPLAIN (verbose, costs off)
+UPDATE ft2 SET c3 = 'baz'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
+  RETURNING ft2.*, ft4.*, ft5.*;                                                    -- can't be pushed down
+                                                                                                                                          QUERY PLAN                                                                                                                                          
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Update on public.ft2
+   Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft4.c1, ft4.c2, ft4.c3, ft5.c1, ft5.c2, ft5.c3
+   Remote SQL: UPDATE "S 1"."T 1" SET c3 = $2 WHERE ctid = $1 RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8
+   ->  Nested Loop
+         Output: ft2.c1, ft2.c2, NULL::integer, 'baz'::text, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft2.ctid, ft4.*, ft5.*, ft4.c1, ft4.c2, ft4.c3, ft5.c1, ft5.c2, ft5.c3
+         Join Filter: (ft2.c2 === ft4.c1)
+         ->  Foreign Scan on public.ft2
+               Output: ft2.c1, ft2.c2, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft2.ctid
+               Remote SQL: SELECT "C 1", c2, c4, c5, c6, c7, c8, ctid FROM "S 1"."T 1" WHERE (("C 1" > 2000)) FOR UPDATE
+         ->  Foreign Scan
+               Output: ft4.*, ft4.c1, ft4.c2, ft4.c3, ft5.*, ft5.c1, ft5.c2, ft5.c3
+               Relations: (public.ft4) INNER JOIN (public.ft5)
+               Remote SQL: SELECT CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.c1, r2.c2, r2.c3) END, r2.c1, r2.c2, r2.c3, CASE WHEN (r3.*)::text IS NOT NULL THEN ROW(r3.c1, r3.c2, r3.c3) END, r3.c1, r3.c2, r3.c3 FROM ("S 1"."T 3" r2 INNER JOIN "S 1"."T 4" r3 ON (((r2.c1 = r3.c1))))
+               ->  Hash Join
+                     Output: ft4.*, ft4.c1, ft4.c2, ft4.c3, ft5.*, ft5.c1, ft5.c2, ft5.c3
+                     Hash Cond: (ft4.c1 = ft5.c1)
+                     ->  Foreign Scan on public.ft4
+                           Output: ft4.*, ft4.c1, ft4.c2, ft4.c3
+                           Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 3"
+                     ->  Hash
+                           Output: ft5.*, ft5.c1, ft5.c2, ft5.c3
+                           ->  Foreign Scan on public.ft5
+                                 Output: ft5.*, ft5.c1, ft5.c2, ft5.c3
+                                 Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 4"
+(24 rows)
+
+UPDATE ft2 SET c3 = 'baz'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
+  RETURNING ft2.*, ft4.*, ft5.*;
+  c1  | c2 | c3  | c4 | c5 | c6 |     c7     | c8 | c1 | c2 |   c3   | c1 | c2 |   c3   
+------+----+-----+----+----+----+------------+----+----+----+--------+----+----+--------
+ 2006 |  6 | baz |    |    |    | ft2        |    |  6 |  7 | AAA006 |  6 |  7 | AAA006
+(1 row)
+
+EXPLAIN (verbose, costs off)
+DELETE FROM ft2
+  USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3;                                       -- can't be pushed down
+                                                                                                                                                                     QUERY PLAN                                                                                                                                                                     
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Delete on public.ft2
+   Output: ft2.ctid, ft2.c1, ft2.c2, ft2.c3
+   Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c2, c3, ctid
+   ->  Foreign Scan
+         Output: ft2.ctid, ft4.*, ft5.*
+         Filter: (ft4.c1 === ft5.c1)
+         Relations: ((public.ft2) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)
+         Remote SQL: SELECT r1.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.c1, r2.c2, r2.c3) END, CASE WHEN (r3.*)::text IS NOT NULL THEN ROW(r3.c1, r3.c2, r3.c3) END, r2.c1, r3.c1 FROM (("S 1"."T 1" r1 INNER JOIN "S 1"."T 3" r2 ON (((r1.c2 = r2.c1)) AND ((r1."C 1" > 2000)))) INNER JOIN "S 1"."T 4" r3 ON (TRUE)) FOR UPDATE OF r1
+         ->  Nested Loop
+               Output: ft2.ctid, ft4.*, ft5.*, ft4.c1, ft5.c1
+               ->  Nested Loop
+                     Output: ft2.ctid, ft4.*, ft4.c1
+                     Join Filter: (ft2.c2 = ft4.c1)
+                     ->  Foreign Scan on public.ft2
+                           Output: ft2.ctid, ft2.c2
+                           Remote SQL: SELECT c2, ctid FROM "S 1"."T 1" WHERE (("C 1" > 2000)) FOR UPDATE
+                     ->  Foreign Scan on public.ft4
+                           Output: ft4.*, ft4.c1
+                           Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 3"
+               ->  Foreign Scan on public.ft5
+                     Output: ft5.*, ft5.c1
+                     Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 4"
+(22 rows)
+
+DELETE FROM ft2
+  USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3;
+   ctid   |  c1  | c2 | c3  
+----------+------+----+-----
+ (12,112) | 2006 |  6 | baz
+(1 row)
+
+DELETE FROM ft2 WHERE ft2.c1 > 2000;
+ALTER SERVER loopback OPTIONS (ADD extensions 'postgres_fdw');
 -- Test that trigger on remote table works as expected
 CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$
 BEGIN
index 7ff43337a9a7808009c52ce03c261669253bd442..c1d7f8032e5a931550fc684da4a0544f703f3c86 100644 (file)
@@ -210,6 +210,11 @@ typedef struct PgFdwDirectModifyState
        PGresult   *result;                     /* result for query */
        int                     num_tuples;             /* # of result tuples */
        int                     next_tuple;             /* index of next one to return */
+       Relation        resultRel;              /* relcache entry for the target relation */
+       AttrNumber *attnoMap;           /* array of attnums of input user columns */
+       AttrNumber      ctidAttno;              /* attnum of input ctid column */
+       AttrNumber      oidAttno;               /* attnum of input oid column */
+       bool            hasSystemCols;  /* are there system columns of resultRel? */
 
        /* working memory context */
        MemoryContext temp_cxt;         /* context for per-tuple temporary data */
@@ -376,8 +381,17 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                                                 TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
                                           TupleTableSlot *slot, PGresult *res);
+static List *build_remote_returning(Index rtindex, Relation rel,
+                                          List *returningList);
+static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
 static void execute_dml_stmt(ForeignScanState *node);
 static TupleTableSlot *get_returning_data(ForeignScanState *node);
+static void init_returning_filter(PgFdwDirectModifyState *dmstate,
+                                         List *fdw_scan_tlist,
+                                         Index rtindex);
+static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
+                                          TupleTableSlot *slot,
+                                          EState *estate);
 static void prepare_query_params(PlanState *node,
                                         List *fdw_exprs,
                                         int numParams,
@@ -2144,14 +2158,15 @@ postgresPlanDirectModify(PlannerInfo *root,
        if (subplan->qual != NIL)
                return false;
 
-       /*
-        * We can't handle an UPDATE or DELETE on a foreign join for now.
-        */
-       if (fscan->scan.scanrelid == 0)
-               return false;
-
        /* Safe to fetch data about the target foreign rel */
-       foreignrel = root->simple_rel_array[resultRelation];
+       if (fscan->scan.scanrelid == 0)
+       {
+               foreignrel = find_join_rel(root, fscan->fs_relids);
+               /* We should have a rel for this foreign join. */
+               Assert(foreignrel);
+       }
+       else
+               foreignrel = root->simple_rel_array[resultRelation];
        rte = root->simple_rte_array[resultRelation];
        fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
 
@@ -2212,8 +2227,23 @@ postgresPlanDirectModify(PlannerInfo *root,
         * Extract the relevant RETURNING list if any.
         */
        if (plan->returningLists)
+       {
                returningList = (List *) list_nth(plan->returningLists, subplan_index);
 
+               /*
+                * When performing an UPDATE/DELETE .. RETURNING on a join directly,
+                * we fetch from the foreign server any Vars specified in RETURNING
+                * that refer not only to the target relation but to non-target
+                * relations.  So we'll deparse them into the RETURNING clause of the
+                * remote query; use a targetlist consisting of them instead, which
+                * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
+                * node below.
+                */
+               if (fscan->scan.scanrelid == 0)
+                       returningList = build_remote_returning(resultRelation, rel,
+                                                                                                  returningList);
+       }
+
        /*
         * Construct the SQL command string.
         */
@@ -2221,6 +2251,7 @@ postgresPlanDirectModify(PlannerInfo *root,
        {
                case CMD_UPDATE:
                        deparseDirectUpdateSql(&sql, root, resultRelation, rel,
+                                                                  foreignrel,
                                                                   ((Plan *) fscan)->targetlist,
                                                                   targetAttrs,
                                                                   remote_exprs, &params_list,
@@ -2228,6 +2259,7 @@ postgresPlanDirectModify(PlannerInfo *root,
                        break;
                case CMD_DELETE:
                        deparseDirectDeleteSql(&sql, root, resultRelation, rel,
+                                                                  foreignrel,
                                                                   remote_exprs, &params_list,
                                                                   returningList, &retrieved_attrs);
                        break;
@@ -2255,6 +2287,19 @@ postgresPlanDirectModify(PlannerInfo *root,
                                                                        retrieved_attrs,
                                                                        makeInteger(plan->canSetTag));
 
+       /*
+        * Update the foreign-join-related fields.
+        */
+       if (fscan->scan.scanrelid == 0)
+       {
+               /* No need for the outer subplan. */
+               fscan->scan.plan.lefttree = NULL;
+
+               /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
+               if (returningList)
+                       rebuild_fdw_scan_tlist(fscan, returningList);
+       }
+
        heap_close(rel, NoLock);
        return true;
 }
@@ -2269,6 +2314,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
        ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
        EState     *estate = node->ss.ps.state;
        PgFdwDirectModifyState *dmstate;
+       Index           rtindex;
        RangeTblEntry *rte;
        Oid                     userid;
        ForeignTable *table;
@@ -2291,11 +2337,15 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
         * Identify which user to do the remote access as.  This should match what
         * ExecCheckRTEPerms() does.
         */
-       rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
+       rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
+       rte = rt_fetch(rtindex, estate->es_range_table);
        userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
 
        /* Get info about foreign table. */
-       dmstate->rel = node->ss.ss_currentRelation;
+       if (fsplan->scan.scanrelid == 0)
+               dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
+       else
+               dmstate->rel = node->ss.ss_currentRelation;
        table = GetForeignTable(RelationGetRelid(dmstate->rel));
        user = GetUserMapping(userid, table->serverid);
 
@@ -2305,6 +2355,21 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
         */
        dmstate->conn = GetConnection(user, false);
 
+       /* Update the foreign-join-related fields. */
+       if (fsplan->scan.scanrelid == 0)
+       {
+               /* Save info about foreign table. */
+               dmstate->resultRel = dmstate->rel;
+
+               /*
+                * Set dmstate->rel to NULL to teach get_returning_data() and
+                * make_tuple_from_result_row() that columns fetched from the remote
+                * server are described by fdw_scan_tlist of the foreign-scan plan
+                * node, not the tuple descriptor for the target relation.
+                */
+               dmstate->rel = NULL;
+       }
+
        /* Initialize state variable */
        dmstate->num_tuples = -1;       /* -1 means not set yet */
 
@@ -2325,7 +2390,24 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 
        /* Prepare for input conversion of RETURNING results. */
        if (dmstate->has_returning)
-               dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
+       {
+               TupleDesc       tupdesc;
+
+               if (fsplan->scan.scanrelid == 0)
+                       tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
+               else
+                       tupdesc = RelationGetDescr(dmstate->rel);
+
+               dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+               /*
+                * When performing an UPDATE/DELETE .. RETURNING on a join directly,
+                * initialize a filter to extract an updated/deleted tuple from a scan
+                * tuple.
+                */
+               if (fsplan->scan.scanrelid == 0)
+                       init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
+       }
 
        /*
         * Prepare for processing of parameters used in remote query, if any.
@@ -2406,6 +2488,10 @@ postgresEndDirectModify(ForeignScanState *node)
        ReleaseConnection(dmstate->conn);
        dmstate->conn = NULL;
 
+       /* close the target relation. */
+       if (dmstate->resultRel)
+               ExecCloseScanRelation(dmstate->resultRel);
+
        /* MemoryContext will be deleted automatically. */
 }
 
@@ -3272,6 +3358,136 @@ store_returning_result(PgFdwModifyState *fmstate,
        PG_END_TRY();
 }
 
+/*
+ * build_remote_returning
+ *             Build a RETURNING targetlist of a remote query for performing an
+ *             UPDATE/DELETE .. RETURNING on a join directly
+ */
+static List *
+build_remote_returning(Index rtindex, Relation rel, List *returningList)
+{
+       bool            have_wholerow = false;
+       List       *tlist = NIL;
+       List       *vars;
+       ListCell   *lc;
+
+       Assert(returningList);
+
+       vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
+
+       /*
+        * If there's a whole-row reference to the target relation, then we'll
+        * need all the columns of the relation.
+        */
+       foreach(lc, vars)
+       {
+               Var                *var = (Var *) lfirst(lc);
+
+               if (IsA(var, Var) &&
+                       var->varno == rtindex &&
+                       var->varattno == InvalidAttrNumber)
+               {
+                       have_wholerow = true;
+                       break;
+               }
+       }
+
+       if (have_wholerow)
+       {
+               TupleDesc       tupdesc = RelationGetDescr(rel);
+               int                     i;
+
+               for (i = 1; i <= tupdesc->natts; i++)
+               {
+                       Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
+                       Var                *var;
+
+                       /* Ignore dropped attributes. */
+                       if (attr->attisdropped)
+                               continue;
+
+                       var = makeVar(rtindex,
+                                                 i,
+                                                 attr->atttypid,
+                                                 attr->atttypmod,
+                                                 attr->attcollation,
+                                                 0);
+
+                       tlist = lappend(tlist,
+                                                       makeTargetEntry((Expr *) var,
+                                                                                       list_length(tlist) + 1,
+                                                                                       NULL,
+                                                                                       false));
+               }
+       }
+
+       /* Now add any remaining columns to tlist. */
+       foreach(lc, vars)
+       {
+               Var                *var = (Var *) lfirst(lc);
+
+               /*
+                * No need for whole-row references to the target relation.  We don't
+                * need system columns other than ctid and oid either, since those are
+                * set locally.
+                */
+               if (IsA(var, Var) &&
+                       var->varno == rtindex &&
+                       var->varattno <= InvalidAttrNumber &&
+                       var->varattno != SelfItemPointerAttributeNumber &&
+                       var->varattno != ObjectIdAttributeNumber)
+                       continue;                       /* don't need it */
+
+               if (tlist_member((Expr *) var, tlist))
+                       continue;                       /* already got it */
+
+               tlist = lappend(tlist,
+                                               makeTargetEntry((Expr *) var,
+                                                                               list_length(tlist) + 1,
+                                                                               NULL,
+                                                                               false));
+       }
+
+       list_free(vars);
+
+       return tlist;
+}
+
+/*
+ * rebuild_fdw_scan_tlist
+ *             Build new fdw_scan_tlist of given foreign-scan plan node from given
+ *             tlist
+ *
+ * There might be columns that the fdw_scan_tlist of the given foreign-scan
+ * plan node contains that the given tlist doesn't.  The fdw_scan_tlist would
+ * have contained resjunk columns such as 'ctid' of the target relation and
+ * 'wholerow' of non-target relations, but the tlist might not contain them,
+ * for example.  So, adjust the tlist so it contains all the columns specified
+ * in the fdw_scan_tlist; else setrefs.c will get confused.
+ */
+static void
+rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
+{
+       List       *new_tlist = tlist;
+       List       *old_tlist = fscan->fdw_scan_tlist;
+       ListCell   *lc;
+
+       foreach(lc, old_tlist)
+       {
+               TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+               if (tlist_member(tle->expr, new_tlist))
+                       continue;                       /* already got it */
+
+               new_tlist = lappend(new_tlist,
+                                                       makeTargetEntry(tle->expr,
+                                                                                       list_length(new_tlist) + 1,
+                                                                                       NULL,
+                                                                                       false));
+       }
+       fscan->fdw_scan_tlist = new_tlist;
+}
+
 /*
  * Execute a direct UPDATE/DELETE statement.
  */
@@ -3332,6 +3548,7 @@ get_returning_data(ForeignScanState *node)
        EState     *estate = node->ss.ps.state;
        ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
        TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+       TupleTableSlot *resultSlot;
 
        Assert(resultRelInfo->ri_projectReturning);
 
@@ -3349,7 +3566,10 @@ get_returning_data(ForeignScanState *node)
         * "UPDATE/DELETE .. RETURNING 1" for example.)
         */
        if (!dmstate->has_returning)
+       {
                ExecStoreAllNullTuple(slot);
+               resultSlot = slot;
+       }
        else
        {
                /*
@@ -3365,7 +3585,7 @@ get_returning_data(ForeignScanState *node)
                                                                                                dmstate->rel,
                                                                                                dmstate->attinmeta,
                                                                                                dmstate->retrieved_attrs,
-                                                                                               NULL,
+                                                                                               node,
                                                                                                dmstate->temp_cxt);
                        ExecStoreTuple(newtup, slot, InvalidBuffer, false);
                }
@@ -3376,15 +3596,204 @@ get_returning_data(ForeignScanState *node)
                        PG_RE_THROW();
                }
                PG_END_TRY();
+
+               /* Get the updated/deleted tuple. */
+               if (dmstate->rel)
+                       resultSlot = slot;
+               else
+                       resultSlot = apply_returning_filter(dmstate, slot, estate);
        }
        dmstate->next_tuple++;
 
        /* Make slot available for evaluation of the local query RETURNING list. */
-       resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
+       resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
+               resultSlot;
 
        return slot;
 }
 
+/*
+ * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
+ */
+static void
+init_returning_filter(PgFdwDirectModifyState *dmstate,
+                                         List *fdw_scan_tlist,
+                                         Index rtindex)
+{
+       TupleDesc       resultTupType = RelationGetDescr(dmstate->resultRel);
+       ListCell   *lc;
+       int                     i;
+
+       /*
+        * Calculate the mapping between the fdw_scan_tlist's entries and the
+        * result tuple's attributes.
+        *
+        * The "map" is an array of indexes of the result tuple's attributes in
+        * fdw_scan_tlist, i.e., one entry for every attribute of the result
+        * tuple.  We store zero for any attributes that don't have the
+        * corresponding entries in that list, marking that a NULL is needed in
+        * the result tuple.
+        *
+        * Also get the indexes of the entries for ctid and oid if any.
+        */
+       dmstate->attnoMap = (AttrNumber *)
+               palloc0(resultTupType->natts * sizeof(AttrNumber));
+
+       dmstate->ctidAttno = dmstate->oidAttno = 0;
+
+       i = 1;
+       dmstate->hasSystemCols = false;
+       foreach(lc, fdw_scan_tlist)
+       {
+               TargetEntry *tle = (TargetEntry *) lfirst(lc);
+               Var                *var = (Var *) tle->expr;
+
+               Assert(IsA(var, Var));
+
+               /*
+                * If the Var is a column of the target relation to be retrieved from
+                * the foreign server, get the index of the entry.
+                */
+               if (var->varno == rtindex &&
+                       list_member_int(dmstate->retrieved_attrs, i))
+               {
+                       int                     attrno = var->varattno;
+
+                       if (attrno < 0)
+                       {
+                               /*
+                                * We don't retrieve system columns other than ctid and oid.
+                                */
+                               if (attrno == SelfItemPointerAttributeNumber)
+                                       dmstate->ctidAttno = i;
+                               else if (attrno == ObjectIdAttributeNumber)
+                                       dmstate->oidAttno = i;
+                               else
+                                       Assert(false);
+                               dmstate->hasSystemCols = true;
+                       }
+                       else
+                       {
+                               /*
+                                * We don't retrieve whole-row references to the target
+                                * relation either.
+                                */
+                               Assert(attrno > 0);
+
+                               dmstate->attnoMap[attrno - 1] = i;
+                       }
+               }
+               i++;
+       }
+}
+
+/*
+ * Extract and return an updated/deleted tuple from a scan tuple.
+ */
+static TupleTableSlot *
+apply_returning_filter(PgFdwDirectModifyState *dmstate,
+                                          TupleTableSlot *slot,
+                                          EState *estate)
+{
+       TupleDesc       resultTupType = RelationGetDescr(dmstate->resultRel);
+       TupleTableSlot *resultSlot;
+       Datum      *values;
+       bool       *isnull;
+       Datum      *old_values;
+       bool       *old_isnull;
+       int                     i;
+
+       /*
+        * Use the trigger tuple slot as a place to store the result tuple.
+        */
+       resultSlot = estate->es_trig_tuple_slot;
+       if (resultSlot->tts_tupleDescriptor != resultTupType)
+               ExecSetSlotDescriptor(resultSlot, resultTupType);
+
+       /*
+        * Extract all the values of the scan tuple.
+        */
+       slot_getallattrs(slot);
+       old_values = slot->tts_values;
+       old_isnull = slot->tts_isnull;
+
+       /*
+        * Prepare to build the result tuple.
+        */
+       ExecClearTuple(resultSlot);
+       values = resultSlot->tts_values;
+       isnull = resultSlot->tts_isnull;
+
+       /*
+        * Transpose data into proper fields of the result tuple.
+        */
+       for (i = 0; i < resultTupType->natts; i++)
+       {
+               int                     j = dmstate->attnoMap[i];
+
+               if (j == 0)
+               {
+                       values[i] = (Datum) 0;
+                       isnull[i] = true;
+               }
+               else
+               {
+                       values[i] = old_values[j - 1];
+                       isnull[i] = old_isnull[j - 1];
+               }
+       }
+
+       /*
+        * Build the virtual tuple.
+        */
+       ExecStoreVirtualTuple(resultSlot);
+
+       /*
+        * If we have any system columns to return, install them.
+        */
+       if (dmstate->hasSystemCols)
+       {
+               HeapTuple       resultTup = ExecMaterializeSlot(resultSlot);
+
+               /* ctid */
+               if (dmstate->ctidAttno)
+               {
+                       ItemPointer ctid = NULL;
+
+                       ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
+                       resultTup->t_self = *ctid;
+               }
+
+               /* oid */
+               if (dmstate->oidAttno)
+               {
+                       Oid                     oid = InvalidOid;
+
+                       oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
+                       HeapTupleSetOid(resultTup, oid);
+               }
+
+               /*
+                * And remaining columns
+                *
+                * Note: since we currently don't allow the target relation to appear
+                * on the nullable side of an outer join, any system columns wouldn't
+                * go to NULL.
+                *
+                * Note: no need to care about tableoid here because it will be
+                * initialized in ExecProcessReturning().
+                */
+               HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
+               HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
+               HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
+       }
+
+       /*
+        * And return the result tuple.
+        */
+       return resultSlot;
+}
+
 /*
  * Prepare for processing of parameters used in remote query.
  */
@@ -4954,11 +5363,8 @@ make_tuple_from_result_row(PGresult *res,
                tupdesc = RelationGetDescr(rel);
        else
        {
-               PgFdwScanState *fdw_sstate;
-
                Assert(fsstate);
-               fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
-               tupdesc = fdw_sstate->tupdesc;
+               tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
        }
 
        values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
index 1ae809d2c6dfe3c2a022d79137513b79d3cf935f..d37cc88b6ec292d54f551751f2d1de8ff45a7f9e 100644 (file)
@@ -150,6 +150,7 @@ extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
                                 List **retrieved_attrs);
 extern void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
                                           Index rtindex, Relation rel,
+                                          RelOptInfo *foreignrel,
                                           List *targetlist,
                                           List *targetAttrs,
                                           List *remote_conds,
@@ -162,6 +163,7 @@ extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root,
                                 List **retrieved_attrs);
 extern void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
                                           Index rtindex, Relation rel,
+                                          RelOptInfo *foreignrel,
                                           List *remote_conds,
                                           List **params_list,
                                           List *returningList,
index 400a9b0cd7b95349ecb72406ce54937b5803bc39..e0a1d6febed9458f0f78223328fb1c0585beeb0d 100644 (file)
@@ -1082,14 +1082,14 @@ UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING
 UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *;
 EXPLAIN (verbose, costs off)
 UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
-  FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;                               -- can't be pushed down
+  FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;                               -- can be pushed down
 UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
   FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
 EXPLAIN (verbose, costs off)
   DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;                               -- can be pushed down
 DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
 EXPLAIN (verbose, costs off)
-DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;                -- can't be pushed down
+DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;                -- can be pushed down
 DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
 SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1;
 EXPLAIN (verbose, costs off)
@@ -1102,6 +1102,58 @@ EXPLAIN (verbose, costs off)
 DELETE FROM ft2 WHERE c1 = 9999 RETURNING tableoid::regclass;                       -- can be pushed down
 DELETE FROM ft2 WHERE c1 = 9999 RETURNING tableoid::regclass;
 
+-- Test UPDATE/DELETE with RETURNING on a three-table join
+INSERT INTO ft2 (c1,c2,c3)
+  SELECT id, id - 1200, to_char(id, 'FM00000') FROM generate_series(1201, 1300) id;
+EXPLAIN (verbose, costs off)
+UPDATE ft2 SET c3 = 'foo'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*;                             -- can be pushed down
+UPDATE ft2 SET c3 = 'foo'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*;
+EXPLAIN (verbose, costs off)
+DELETE FROM ft2
+  USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
+  RETURNING 100;                                                                    -- can be pushed down
+DELETE FROM ft2
+  USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
+  RETURNING 100;
+DELETE FROM ft2 WHERE ft2.c1 > 1200;
+
+-- Test UPDATE/DELETE with WHERE or JOIN/ON conditions containing
+-- user-defined operators/functions
+ALTER SERVER loopback OPTIONS (DROP extensions);
+INSERT INTO ft2 (c1,c2,c3)
+  SELECT id, id % 10, to_char(id, 'FM00000') FROM generate_series(2001, 2010) id;
+EXPLAIN (verbose, costs off)
+UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;            -- can't be pushed down
+UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;
+EXPLAIN (verbose, costs off)
+UPDATE ft2 SET c3 = 'baz'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
+  RETURNING ft2.*, ft4.*, ft5.*;                                                    -- can't be pushed down
+UPDATE ft2 SET c3 = 'baz'
+  FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
+  RETURNING ft2.*, ft4.*, ft5.*;
+EXPLAIN (verbose, costs off)
+DELETE FROM ft2
+  USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3;                                       -- can't be pushed down
+DELETE FROM ft2
+  USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
+  WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
+  RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3;
+DELETE FROM ft2 WHERE ft2.c1 > 2000;
+ALTER SERVER loopback OPTIONS (ADD extensions 'postgres_fdw');
+
 -- Test that trigger on remote table works as expected
 CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$
 BEGIN