static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static PgFdwModifyState *create_foreign_modify(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ CmdType operation,
+ Plan *subplan,
+ char *query,
+ List *target_attrs,
+ bool has_returning,
+ List *retrieved_attrs);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
+static void finish_foreign_modify(PgFdwModifyState *fmstate);
static List *build_remote_returning(Index rtindex, Relation rel,
List *returningList);
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
int eflags)
{
PgFdwModifyState *fmstate;
- EState *estate = mtstate->ps.state;
- CmdType operation = mtstate->operation;
- Relation rel = resultRelInfo->ri_RelationDesc;
- RangeTblEntry *rte;
- Oid userid;
- ForeignTable *table;
- UserMapping *user;
- AttrNumber n_params;
- Oid typefnoid;
- bool isvarlena;
- ListCell *lc;
- TupleDesc tupdesc = RelationGetDescr(rel);
+ char *query;
+ List *target_attrs;
+ bool has_returning;
+ List *retrieved_attrs;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;
- /* Begin constructing PgFdwModifyState. */
- fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
- fmstate->rel = rel;
-
- /*
- * Identify which user to do the remote access as. This should match what
- * ExecCheckRTEPerms() does.
- */
- rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
- userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
-
- /* Get info about foreign table. */
- table = GetForeignTable(RelationGetRelid(rel));
- user = GetUserMapping(userid, table->serverid);
-
- /* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
- fmstate->p_name = NULL; /* prepared statement not made yet */
-
/* Deconstruct fdw_private data. */
- fmstate->query = strVal(list_nth(fdw_private,
- FdwModifyPrivateUpdateSql));
- fmstate->target_attrs = (List *) list_nth(fdw_private,
- FdwModifyPrivateTargetAttnums);
- fmstate->has_returning = intVal(list_nth(fdw_private,
- FdwModifyPrivateHasReturning));
- fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
- FdwModifyPrivateRetrievedAttrs);
-
- /* Create context for per-tuple temp workspace. */
- fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
- "postgres_fdw temporary data",
- ALLOCSET_SMALL_SIZES);
-
- /* Prepare for input conversion of RETURNING results. */
- if (fmstate->has_returning)
- fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
- /* Prepare for output conversion of parameters used in prepared stmt. */
- n_params = list_length(fmstate->target_attrs) + 1;
- fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
- fmstate->p_nums = 0;
-
- if (operation == CMD_UPDATE || operation == CMD_DELETE)
- {
- /* Find the ctid resjunk column in the subplan's result */
- Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
-
- fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
- "ctid");
- if (!AttributeNumberIsValid(fmstate->ctidAttno))
- elog(ERROR, "could not find junk ctid column");
-
- /* First transmittable parameter will be ctid */
- getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
- fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
- fmstate->p_nums++;
- }
-
- if (operation == CMD_INSERT || operation == CMD_UPDATE)
- {
- /* Set up for remaining transmittable parameters */
- foreach(lc, fmstate->target_attrs)
- {
- int attnum = lfirst_int(lc);
- Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
-
- Assert(!attr->attisdropped);
-
- getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
- fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
- fmstate->p_nums++;
- }
- }
-
- Assert(fmstate->p_nums <= n_params);
+ query = strVal(list_nth(fdw_private,
+ FdwModifyPrivateUpdateSql));
+ target_attrs = (List *) list_nth(fdw_private,
+ FdwModifyPrivateTargetAttnums);
+ has_returning = intVal(list_nth(fdw_private,
+ FdwModifyPrivateHasReturning));
+ retrieved_attrs = (List *) list_nth(fdw_private,
+ FdwModifyPrivateRetrievedAttrs);
+
+ /* Construct an execution state. */
+ fmstate = create_foreign_modify(mtstate->ps.state,
+ resultRelInfo,
+ mtstate->operation,
+ mtstate->mt_plans[subplan_index]->plan,
+ query,
+ target_attrs,
+ has_returning,
+ retrieved_attrs);
resultRelInfo->ri_FdwState = fmstate;
}
if (fmstate == NULL)
return;
- /* If we created a prepared statement, destroy it */
- if (fmstate->p_name)
- {
- char sql[64];
- PGresult *res;
-
- snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
- res = pgfdw_exec_query(fmstate->conn, sql);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
- PQclear(res);
- fmstate->p_name = NULL;
- }
-
- /* Release remote connection */
- ReleaseConnection(fmstate->conn);
- fmstate->conn = NULL;
+ /* Destroy the execution state */
+ finish_foreign_modify(fmstate);
}
/*
PQclear(res);
}
+/*
+ * create_foreign_modify
+ * Construct an execution state of a foreign insert/update/delete
+ * operation
+ */
+static PgFdwModifyState *
+create_foreign_modify(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ CmdType operation,
+ Plan *subplan,
+ char *query,
+ List *target_attrs,
+ bool has_returning,
+ List *retrieved_attrs)
+{
+ PgFdwModifyState *fmstate;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ TupleDesc tupdesc = RelationGetDescr(rel);
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ UserMapping *user;
+ AttrNumber n_params;
+ Oid typefnoid;
+ bool isvarlena;
+ ListCell *lc;
+
+ /* Begin constructing PgFdwModifyState. */
+ fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
+ fmstate->rel = rel;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ table = GetForeignTable(RelationGetRelid(rel));
+ user = GetUserMapping(userid, table->serverid);
+
+ /* Open connection; report that we'll create a prepared statement. */
+ fmstate->conn = GetConnection(user, true);
+ fmstate->p_name = NULL; /* prepared statement not made yet */
+
+ /* Set up remote query information. */
+ fmstate->query = query;
+ fmstate->target_attrs = target_attrs;
+ fmstate->has_returning = has_returning;
+ fmstate->retrieved_attrs = retrieved_attrs;
+
+ /* Create context for per-tuple temp workspace. */
+ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_SIZES);
+
+ /* Prepare for input conversion of RETURNING results. */
+ if (fmstate->has_returning)
+ fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+ /* Prepare for output conversion of parameters used in prepared stmt. */
+ n_params = list_length(fmstate->target_attrs) + 1;
+ fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
+ fmstate->p_nums = 0;
+
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ Assert(subplan != NULL);
+
+ /* Find the ctid resjunk column in the subplan's result */
+ fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+ "ctid");
+ if (!AttributeNumberIsValid(fmstate->ctidAttno))
+ elog(ERROR, "could not find junk ctid column");
+
+ /* First transmittable parameter will be ctid */
+ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ fmstate->p_nums++;
+ }
+
+ if (operation == CMD_INSERT || operation == CMD_UPDATE)
+ {
+ /* Set up for remaining transmittable parameters */
+ foreach(lc, fmstate->target_attrs)
+ {
+ int attnum = lfirst_int(lc);
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+ Assert(!attr->attisdropped);
+
+ getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ fmstate->p_nums++;
+ }
+ }
+
+ Assert(fmstate->p_nums <= n_params);
+
+ return fmstate;
+}
+
/*
* prepare_foreign_modify
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
PG_END_TRY();
}
+/*
+ * finish_foreign_modify
+ * Release resources for a foreign insert/update/delete operation
+ */
+static void
+finish_foreign_modify(PgFdwModifyState *fmstate)
+{
+ Assert(fmstate != NULL);
+
+ /* If we created a prepared statement, destroy it */
+ if (fmstate->p_name)
+ {
+ char sql[64];
+ PGresult *res;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = pgfdw_exec_query(fmstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ PQclear(res);
+ fmstate->p_name = NULL;
+ }
+
+ /* Release remote connection */
+ ReleaseConnection(fmstate->conn);
+ fmstate->conn = NULL;
+}
+
/*
* build_remote_returning
* Build a RETURNING targetlist of a remote query for performing an