]> granicus.if.org Git - postgresql/commitdiff
Refactor PgFdwModifyState creation/destruction into separate functions.
authorRobert Haas <rhaas@postgresql.org>
Fri, 6 Apr 2018 15:29:43 +0000 (11:29 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 6 Apr 2018 15:29:43 +0000 (11:29 -0400)
Etsuro Fujita.  The larger patch series of which this is a part has
been reviewed by Amit Langote, David Fetter, Maksim Milyutin,
Álvaro Herrera, Stephen Frost, and me.

Discussion: http://postgr.es/m/5A95487E.9050808@lab.ntt.co.jp

contrib/postgres_fdw/postgres_fdw.c

index a15ce28a48b3b575b9a6166229f83502d12198b1..e7441c759ba0d1a6b4773f9ecff2532d4751626b 100644 (file)
@@ -376,12 +376,21 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void create_cursor(ForeignScanState *node);
 static void fetch_more_data(ForeignScanState *node);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static PgFdwModifyState *create_foreign_modify(EState *estate,
+                                         ResultRelInfo *resultRelInfo,
+                                         CmdType operation,
+                                         Plan *subplan,
+                                         char *query,
+                                         List *target_attrs,
+                                         bool has_returning,
+                                         List *retrieved_attrs);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                                                 ItemPointer tupleid,
                                                 TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
                                           TupleTableSlot *slot, PGresult *res);
+static void finish_foreign_modify(PgFdwModifyState *fmstate);
 static List *build_remote_returning(Index rtindex, Relation rel,
                                           List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -1681,18 +1690,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
                                                   int eflags)
 {
        PgFdwModifyState *fmstate;
-       EState     *estate = mtstate->ps.state;
-       CmdType         operation = mtstate->operation;
-       Relation        rel = resultRelInfo->ri_RelationDesc;
-       RangeTblEntry *rte;
-       Oid                     userid;
-       ForeignTable *table;
-       UserMapping *user;
-       AttrNumber      n_params;
-       Oid                     typefnoid;
-       bool            isvarlena;
-       ListCell   *lc;
-       TupleDesc       tupdesc = RelationGetDescr(rel);
+       char       *query;
+       List       *target_attrs;
+       bool            has_returning;
+       List       *retrieved_attrs;
 
        /*
         * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
@@ -1701,82 +1702,25 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
        if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
                return;
 
-       /* Begin constructing PgFdwModifyState. */
-       fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
-       fmstate->rel = rel;
-
-       /*
-        * Identify which user to do the remote access as.  This should match what
-        * ExecCheckRTEPerms() does.
-        */
-       rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
-       userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
-
-       /* Get info about foreign table. */
-       table = GetForeignTable(RelationGetRelid(rel));
-       user = GetUserMapping(userid, table->serverid);
-
-       /* Open connection; report that we'll create a prepared statement. */
-       fmstate->conn = GetConnection(user, true);
-       fmstate->p_name = NULL;         /* prepared statement not made yet */
-
        /* Deconstruct fdw_private data. */
-       fmstate->query = strVal(list_nth(fdw_private,
-                                                                        FdwModifyPrivateUpdateSql));
-       fmstate->target_attrs = (List *) list_nth(fdw_private,
-                                                                                         FdwModifyPrivateTargetAttnums);
-       fmstate->has_returning = intVal(list_nth(fdw_private,
-                                                                                        FdwModifyPrivateHasReturning));
-       fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
-                                                                                                FdwModifyPrivateRetrievedAttrs);
-
-       /* Create context for per-tuple temp workspace. */
-       fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
-                                                                                         "postgres_fdw temporary data",
-                                                                                         ALLOCSET_SMALL_SIZES);
-
-       /* Prepare for input conversion of RETURNING results. */
-       if (fmstate->has_returning)
-               fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-       /* Prepare for output conversion of parameters used in prepared stmt. */
-       n_params = list_length(fmstate->target_attrs) + 1;
-       fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
-       fmstate->p_nums = 0;
-
-       if (operation == CMD_UPDATE || operation == CMD_DELETE)
-       {
-               /* Find the ctid resjunk column in the subplan's result */
-               Plan       *subplan = mtstate->mt_plans[subplan_index]->plan;
-
-               fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
-                                                                                                                 "ctid");
-               if (!AttributeNumberIsValid(fmstate->ctidAttno))
-                       elog(ERROR, "could not find junk ctid column");
-
-               /* First transmittable parameter will be ctid */
-               getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
-               fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
-               fmstate->p_nums++;
-       }
-
-       if (operation == CMD_INSERT || operation == CMD_UPDATE)
-       {
-               /* Set up for remaining transmittable parameters */
-               foreach(lc, fmstate->target_attrs)
-               {
-                       int                     attnum = lfirst_int(lc);
-                       Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
-
-                       Assert(!attr->attisdropped);
-
-                       getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
-                       fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
-                       fmstate->p_nums++;
-               }
-       }
-
-       Assert(fmstate->p_nums <= n_params);
+       query = strVal(list_nth(fdw_private,
+                                                       FdwModifyPrivateUpdateSql));
+       target_attrs = (List *) list_nth(fdw_private,
+                                                                        FdwModifyPrivateTargetAttnums);
+       has_returning = intVal(list_nth(fdw_private,
+                                                                       FdwModifyPrivateHasReturning));
+       retrieved_attrs = (List *) list_nth(fdw_private,
+                                                                               FdwModifyPrivateRetrievedAttrs);
+
+       /* Construct an execution state. */
+       fmstate = create_foreign_modify(mtstate->ps.state,
+                                                                       resultRelInfo,
+                                                                       mtstate->operation,
+                                                                       mtstate->mt_plans[subplan_index]->plan,
+                                                                       query,
+                                                                       target_attrs,
+                                                                       has_returning,
+                                                                       retrieved_attrs);
 
        resultRelInfo->ri_FdwState = fmstate;
 }
@@ -2011,28 +1955,8 @@ postgresEndForeignModify(EState *estate,
        if (fmstate == NULL)
                return;
 
-       /* If we created a prepared statement, destroy it */
-       if (fmstate->p_name)
-       {
-               char            sql[64];
-               PGresult   *res;
-
-               snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
-               /*
-                * We don't use a PG_TRY block here, so be careful not to throw error
-                * without releasing the PGresult.
-                */
-               res = pgfdw_exec_query(fmstate->conn, sql);
-               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                       pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
-               PQclear(res);
-               fmstate->p_name = NULL;
-       }
-
-       /* Release remote connection */
-       ReleaseConnection(fmstate->conn);
-       fmstate->conn = NULL;
+       /* Destroy the execution state */
+       finish_foreign_modify(fmstate);
 }
 
 /*
@@ -3228,6 +3152,109 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
        PQclear(res);
 }
 
+/*
+ * create_foreign_modify
+ *             Construct an execution state of a foreign insert/update/delete
+ *             operation
+ */
+static PgFdwModifyState *
+create_foreign_modify(EState *estate,
+                                         ResultRelInfo *resultRelInfo,
+                                         CmdType operation,
+                                         Plan *subplan,
+                                         char *query,
+                                         List *target_attrs,
+                                         bool has_returning,
+                                         List *retrieved_attrs)
+{
+       PgFdwModifyState *fmstate;
+       Relation        rel = resultRelInfo->ri_RelationDesc;
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       RangeTblEntry *rte;
+       Oid                     userid;
+       ForeignTable *table;
+       UserMapping *user;
+       AttrNumber      n_params;
+       Oid                     typefnoid;
+       bool            isvarlena;
+       ListCell   *lc;
+
+       /* Begin constructing PgFdwModifyState. */
+       fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
+       fmstate->rel = rel;
+
+       /*
+        * Identify which user to do the remote access as.  This should match what
+        * ExecCheckRTEPerms() does.
+        */
+       rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
+       userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+       /* Get info about foreign table. */
+       table = GetForeignTable(RelationGetRelid(rel));
+       user = GetUserMapping(userid, table->serverid);
+
+       /* Open connection; report that we'll create a prepared statement. */
+       fmstate->conn = GetConnection(user, true);
+       fmstate->p_name = NULL;         /* prepared statement not made yet */
+
+       /* Set up remote query information. */
+       fmstate->query = query;
+       fmstate->target_attrs = target_attrs;
+       fmstate->has_returning = has_returning;
+       fmstate->retrieved_attrs = retrieved_attrs;
+
+       /* Create context for per-tuple temp workspace. */
+       fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+                                                                                         "postgres_fdw temporary data",
+                                                                                         ALLOCSET_SMALL_SIZES);
+
+       /* Prepare for input conversion of RETURNING results. */
+       if (fmstate->has_returning)
+               fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+       /* Prepare for output conversion of parameters used in prepared stmt. */
+       n_params = list_length(fmstate->target_attrs) + 1;
+       fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
+       fmstate->p_nums = 0;
+
+       if (operation == CMD_UPDATE || operation == CMD_DELETE)
+       {
+               Assert(subplan != NULL);
+
+               /* Find the ctid resjunk column in the subplan's result */
+               fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+                                                                                                                 "ctid");
+               if (!AttributeNumberIsValid(fmstate->ctidAttno))
+                       elog(ERROR, "could not find junk ctid column");
+
+               /* First transmittable parameter will be ctid */
+               getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+               fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+               fmstate->p_nums++;
+       }
+
+       if (operation == CMD_INSERT || operation == CMD_UPDATE)
+       {
+               /* Set up for remaining transmittable parameters */
+               foreach(lc, fmstate->target_attrs)
+               {
+                       int                     attnum = lfirst_int(lc);
+                       Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+                       Assert(!attr->attisdropped);
+
+                       getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+                       fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+                       fmstate->p_nums++;
+               }
+       }
+
+       Assert(fmstate->p_nums <= n_params);
+
+       return fmstate;
+}
+
 /*
  * prepare_foreign_modify
  *             Establish a prepared statement for execution of INSERT/UPDATE/DELETE
@@ -3370,6 +3397,39 @@ store_returning_result(PgFdwModifyState *fmstate,
        PG_END_TRY();
 }
 
+/*
+ * finish_foreign_modify
+ *             Release resources for a foreign insert/update/delete operation
+ */
+static void
+finish_foreign_modify(PgFdwModifyState *fmstate)
+{
+       Assert(fmstate != NULL);
+
+       /* If we created a prepared statement, destroy it */
+       if (fmstate->p_name)
+       {
+               char            sql[64];
+               PGresult   *res;
+
+               snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+               /*
+                * We don't use a PG_TRY block here, so be careful not to throw error
+                * without releasing the PGresult.
+                */
+               res = pgfdw_exec_query(fmstate->conn, sql);
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+               PQclear(res);
+               fmstate->p_name = NULL;
+       }
+
+       /* Release remote connection */
+       ReleaseConnection(fmstate->conn);
+       fmstate->conn = NULL;
+}
+
 /*
  * build_remote_returning
  *             Build a RETURNING targetlist of a remote query for performing an