1 /*-------------------------------------------------------------------------
4 * Foreign-data wrapper for remote PostgreSQL servers
6 * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
9 * contrib/postgres_fdw/postgres_fdw.c
11 *-------------------------------------------------------------------------
15 #include "postgres_fdw.h"
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/pathnode.h"
29 #include "optimizer/paths.h"
30 #include "optimizer/planmain.h"
31 #include "optimizer/prep.h"
32 #include "optimizer/restrictinfo.h"
33 #include "optimizer/var.h"
34 #include "parser/parsetree.h"
35 #include "utils/builtins.h"
36 #include "utils/guc.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
43 /* Default CPU cost to start up a foreign query. */
44 #define DEFAULT_FDW_STARTUP_COST 100.0
46 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
47 #define DEFAULT_FDW_TUPLE_COST 0.01
50 * FDW-specific planner information kept in RelOptInfo.fdw_private for a
51 * foreign table. This information is collected by postgresGetForeignRelSize.
53 typedef struct PgFdwRelationInfo
55 /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
59 /* Bitmap of attr numbers we need to fetch from the remote server. */
60 Bitmapset *attrs_used;
62 /* Cost and selectivity of local_conds. */
63 QualCost local_conds_cost;
64 Selectivity local_conds_sel;
66 /* Estimated size and cost for a scan with baserestrictinfo quals. */
72 /* Options extracted from catalogs. */
73 bool use_remote_estimate;
74 Cost fdw_startup_cost;
77 /* Cached catalog information. */
79 ForeignServer *server;
80 UserMapping *user; /* only set in use_remote_estimate mode */
84 * Indexes of FDW-private information stored in fdw_private lists.
86 * We store various information in ForeignScan.fdw_private to pass it from
87 * planner to executor. Currently we store:
89 * 1) SELECT statement text to be sent to the remote server
90 * 2) Integer list of attribute numbers retrieved by the SELECT
92 * These items are indexed with the enum FdwScanPrivateIndex, so an item
93 * can be fetched with list_nth(). For example, to get the SELECT statement:
94 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
96 enum FdwScanPrivateIndex
98 /* SQL statement to execute remotely (as a String node) */
99 FdwScanPrivateSelectSql,
100 /* Integer list of attribute numbers retrieved by the SELECT */
101 FdwScanPrivateRetrievedAttrs
105 * Similarly, this enum describes what's kept in the fdw_private list for
106 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
108 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
109 * 2) Integer list of target attribute numbers for INSERT/UPDATE
111 * 3) Boolean flag showing if there's a RETURNING clause
112 * 4) Integer list of attribute numbers retrieved by RETURNING, if any
114 enum FdwModifyPrivateIndex
116 /* SQL statement to execute remotely (as a String node) */
117 FdwModifyPrivateUpdateSql,
118 /* Integer list of target attribute numbers for INSERT/UPDATE */
119 FdwModifyPrivateTargetAttnums,
120 /* has-returning flag (as an integer Value node) */
121 FdwModifyPrivateHasReturning,
122 /* Integer list of attribute numbers retrieved by RETURNING */
123 FdwModifyPrivateRetrievedAttrs
127 * Execution state of a foreign scan using postgres_fdw.
129 typedef struct PgFdwScanState
131 Relation rel; /* relcache entry for the foreign table */
132 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
134 /* extracted fdw_private data */
135 char *query; /* text of SELECT command */
136 List *retrieved_attrs; /* list of retrieved attribute numbers */
138 /* for remote query execution */
139 PGconn *conn; /* connection for the scan */
140 unsigned int cursor_number; /* quasi-unique ID for my cursor */
141 bool cursor_exists; /* have we created the cursor? */
142 int numParams; /* number of parameters passed to query */
143 FmgrInfo *param_flinfo; /* output conversion functions for them */
144 List *param_exprs; /* executable expressions for param values */
145 const char **param_values; /* textual values of query parameters */
147 /* for storing result tuples */
148 HeapTuple *tuples; /* array of currently-retrieved tuples */
149 int num_tuples; /* # of tuples in array */
150 int next_tuple; /* index of next one to return */
152 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
153 int fetch_ct_2; /* Min(# of fetches done, 2) */
154 bool eof_reached; /* true if last fetch reached EOF */
156 /* working memory contexts */
157 MemoryContext batch_cxt; /* context holding current batch of tuples */
158 MemoryContext temp_cxt; /* context for per-tuple temporary data */
162 * Execution state of a foreign insert/update/delete operation.
164 typedef struct PgFdwModifyState
166 Relation rel; /* relcache entry for the foreign table */
167 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
169 /* for remote query execution */
170 PGconn *conn; /* connection for the scan */
171 char *p_name; /* name of prepared statement, if created */
173 /* extracted fdw_private data */
174 char *query; /* text of INSERT/UPDATE/DELETE command */
175 List *target_attrs; /* list of target attribute numbers */
176 bool has_returning; /* is there a RETURNING clause? */
177 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
179 /* info about parameters for prepared statement */
180 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181 int p_nums; /* number of parameters to transmit */
182 FmgrInfo *p_flinfo; /* output conversion functions for them */
184 /* working memory context */
185 MemoryContext temp_cxt; /* context for per-tuple temporary data */
189 * Workspace for analyzing a foreign table.
191 typedef struct PgFdwAnalyzeState
193 Relation rel; /* relcache entry for the foreign table */
194 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195 List *retrieved_attrs; /* attr numbers retrieved by query */
197 /* collected sample rows */
198 HeapTuple *rows; /* array of size targrows */
199 int targrows; /* target # of sample rows */
200 int numrows; /* # of sample rows collected */
202 /* for random sampling */
203 double samplerows; /* # of rows fetched */
204 double rowstoskip; /* # of rows to skip before next sample */
205 double rstate; /* random state */
207 /* working memory contexts */
208 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
209 MemoryContext temp_cxt; /* context for per-tuple temporary data */
213 * Identify the attribute where data conversion fails.
215 typedef struct ConversionLocation
217 Relation rel; /* foreign table's relcache entry */
218 AttrNumber cur_attno; /* attribute number being processed, or 0 */
219 } ConversionLocation;
221 /* Callback argument for ec_member_matches_foreign */
224 Expr *current; /* current expr, or NULL if not yet found */
225 List *already_used; /* expressions already dealt with */
226 } ec_member_foreign_arg;
231 extern Datum postgres_fdw_handler(PG_FUNCTION_ARGS);
233 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
236 * FDW callback routines
238 static void postgresGetForeignRelSize(PlannerInfo *root,
241 static void postgresGetForeignPaths(PlannerInfo *root,
244 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
247 ForeignPath *best_path,
250 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
251 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
252 static void postgresReScanForeignScan(ForeignScanState *node);
253 static void postgresEndForeignScan(ForeignScanState *node);
254 static void postgresAddForeignUpdateTargets(Query *parsetree,
255 RangeTblEntry *target_rte,
256 Relation target_relation);
257 static List *postgresPlanForeignModify(PlannerInfo *root,
259 Index resultRelation,
261 static void postgresBeginForeignModify(ModifyTableState *mtstate,
262 ResultRelInfo *resultRelInfo,
266 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
267 ResultRelInfo *resultRelInfo,
268 TupleTableSlot *slot,
269 TupleTableSlot *planSlot);
270 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
271 ResultRelInfo *resultRelInfo,
272 TupleTableSlot *slot,
273 TupleTableSlot *planSlot);
274 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
275 ResultRelInfo *resultRelInfo,
276 TupleTableSlot *slot,
277 TupleTableSlot *planSlot);
278 static void postgresEndForeignModify(EState *estate,
279 ResultRelInfo *resultRelInfo);
280 static int postgresIsForeignRelUpdatable(Relation rel);
281 static void postgresExplainForeignScan(ForeignScanState *node,
283 static void postgresExplainForeignModify(ModifyTableState *mtstate,
284 ResultRelInfo *rinfo,
288 static bool postgresAnalyzeForeignTable(Relation relation,
289 AcquireSampleRowsFunc *func,
290 BlockNumber *totalpages);
295 static void estimate_path_cost_size(PlannerInfo *root,
298 double *p_rows, int *p_width,
299 Cost *p_startup_cost, Cost *p_total_cost);
300 static void get_remote_estimate(const char *sql,
306 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
307 EquivalenceClass *ec, EquivalenceMember *em,
309 static void create_cursor(ForeignScanState *node);
310 static void fetch_more_data(ForeignScanState *node);
311 static void close_cursor(PGconn *conn, unsigned int cursor_number);
312 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
313 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
315 TupleTableSlot *slot);
316 static void store_returning_result(PgFdwModifyState *fmstate,
317 TupleTableSlot *slot, PGresult *res);
318 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
319 HeapTuple *rows, int targrows,
321 double *totaldeadrows);
322 static void analyze_row_processor(PGresult *res, int row,
323 PgFdwAnalyzeState *astate);
324 static HeapTuple make_tuple_from_result_row(PGresult *res,
327 AttInMetadata *attinmeta,
328 List *retrieved_attrs,
329 MemoryContext temp_context);
330 static void conversion_error_callback(void *arg);
334 * Foreign-data wrapper handler function: return a struct with pointers
335 * to my callback routines.
338 postgres_fdw_handler(PG_FUNCTION_ARGS)
340 FdwRoutine *routine = makeNode(FdwRoutine);
342 /* Functions for scanning foreign tables */
343 routine->GetForeignRelSize = postgresGetForeignRelSize;
344 routine->GetForeignPaths = postgresGetForeignPaths;
345 routine->GetForeignPlan = postgresGetForeignPlan;
346 routine->BeginForeignScan = postgresBeginForeignScan;
347 routine->IterateForeignScan = postgresIterateForeignScan;
348 routine->ReScanForeignScan = postgresReScanForeignScan;
349 routine->EndForeignScan = postgresEndForeignScan;
351 /* Functions for updating foreign tables */
352 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
353 routine->PlanForeignModify = postgresPlanForeignModify;
354 routine->BeginForeignModify = postgresBeginForeignModify;
355 routine->ExecForeignInsert = postgresExecForeignInsert;
356 routine->ExecForeignUpdate = postgresExecForeignUpdate;
357 routine->ExecForeignDelete = postgresExecForeignDelete;
358 routine->EndForeignModify = postgresEndForeignModify;
359 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
361 /* Support functions for EXPLAIN */
362 routine->ExplainForeignScan = postgresExplainForeignScan;
363 routine->ExplainForeignModify = postgresExplainForeignModify;
365 /* Support functions for ANALYZE */
366 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
368 PG_RETURN_POINTER(routine);
372 * postgresGetForeignRelSize
373 * Estimate # of rows and width of the result of the scan
375 * We should consider the effect of all baserestrictinfo clauses here, but
376 * not any join clauses.
379 postgresGetForeignRelSize(PlannerInfo *root,
383 PgFdwRelationInfo *fpinfo;
387 * We use PgFdwRelationInfo to pass various information to subsequent
390 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
391 baserel->fdw_private = (void *) fpinfo;
393 /* Look up foreign-table catalog info. */
394 fpinfo->table = GetForeignTable(foreigntableid);
395 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
398 * Extract user-settable option values. Note that per-table setting of
399 * use_remote_estimate overrides per-server setting.
401 fpinfo->use_remote_estimate = false;
402 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
403 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
405 foreach(lc, fpinfo->server->options)
407 DefElem *def = (DefElem *) lfirst(lc);
409 if (strcmp(def->defname, "use_remote_estimate") == 0)
410 fpinfo->use_remote_estimate = defGetBoolean(def);
411 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
412 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
413 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
414 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
416 foreach(lc, fpinfo->table->options)
418 DefElem *def = (DefElem *) lfirst(lc);
420 if (strcmp(def->defname, "use_remote_estimate") == 0)
422 fpinfo->use_remote_estimate = defGetBoolean(def);
423 break; /* only need the one value */
428 * If the table or the server is configured to use remote estimates,
429 * identify which user to do remote access as during planning. This
430 * should match what ExecCheckRTEPerms() does. If we fail due to lack of
431 * permissions, the query would have failed at runtime anyway.
433 if (fpinfo->use_remote_estimate)
435 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
436 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
438 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
444 * Identify which baserestrictinfo clauses can be sent to the remote
445 * server and which can't.
447 classifyConditions(root, baserel,
448 &fpinfo->remote_conds, &fpinfo->local_conds);
451 * Identify which attributes will need to be retrieved from the remote
452 * server. These include all attrs needed for joins or final output, plus
453 * all attrs used in the local_conds. (Note: if we end up using a
454 * parameterized scan, it's possible that some of the join clauses will be
455 * sent to the remote and thus we wouldn't really need to retrieve the
456 * columns used in them. Doesn't seem worth detecting that case though.)
458 fpinfo->attrs_used = NULL;
459 pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
460 &fpinfo->attrs_used);
461 foreach(lc, fpinfo->local_conds)
463 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
465 pull_varattnos((Node *) rinfo->clause, baserel->relid,
466 &fpinfo->attrs_used);
470 * Compute the selectivity and cost of the local_conds, so we don't have
471 * to do it over again for each path. The best we can do for these
472 * conditions is to estimate selectivity on the basis of local statistics.
474 fpinfo->local_conds_sel = clauselist_selectivity(root,
480 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
483 * If the table or the server is configured to use remote estimates,
484 * connect to the foreign server and execute EXPLAIN to estimate the
485 * number of rows selected by the restriction clauses, as well as the
486 * average row width. Otherwise, estimate using whatever statistics we
487 * have locally, in a way similar to ordinary tables.
489 if (fpinfo->use_remote_estimate)
492 * Get cost/size estimates with help of remote server. Save the
493 * values in fpinfo so we don't need to do it again to generate the
494 * basic foreign path.
496 estimate_path_cost_size(root, baserel, NIL,
497 &fpinfo->rows, &fpinfo->width,
498 &fpinfo->startup_cost, &fpinfo->total_cost);
500 /* Report estimated baserel size to planner. */
501 baserel->rows = fpinfo->rows;
502 baserel->width = fpinfo->width;
507 * If the foreign table has never been ANALYZEd, it will have relpages
508 * and reltuples equal to zero, which most likely has nothing to do
509 * with reality. We can't do a whole lot about that if we're not
510 * allowed to consult the remote server, but we can use a hack similar
511 * to plancat.c's treatment of empty relations: use a minimum size
512 * estimate of 10 pages, and divide by the column-datatype-based width
513 * estimate to get the corresponding number of tuples.
515 if (baserel->pages == 0 && baserel->tuples == 0)
519 (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
522 /* Estimate baserel size as best we can with local statistics. */
523 set_baserel_size_estimates(root, baserel);
525 /* Fill in basically-bogus cost estimates for use later. */
526 estimate_path_cost_size(root, baserel, NIL,
527 &fpinfo->rows, &fpinfo->width,
528 &fpinfo->startup_cost, &fpinfo->total_cost);
533 * postgresGetForeignPaths
534 * Create possible scan paths for a scan on the foreign table
537 postgresGetForeignPaths(PlannerInfo *root,
541 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
544 Relids required_outer;
552 * Create simplest ForeignScan path node and add it to baserel. This path
553 * corresponds to SeqScan path of regular tables (though depending on what
554 * baserestrict conditions we were able to send to remote, there might
555 * actually be an indexscan happening there). We already did all the work
556 * to estimate cost and size of this path.
558 path = create_foreignscan_path(root, baserel,
560 fpinfo->startup_cost,
562 NIL, /* no pathkeys */
563 NULL, /* no outer rel either */
564 NIL); /* no fdw_private list */
565 add_path(baserel, (Path *) path);
568 * If we're not using remote estimates, stop here. We have no way to
569 * estimate whether any join clauses would be worth sending across, so
570 * don't bother building parameterized paths.
572 if (!fpinfo->use_remote_estimate)
576 * As a crude first hack, we consider each available join clause and try
577 * to make a parameterized path using just that clause. Later we should
578 * consider combinations of clauses, probably.
581 /* Scan the rel's join clauses */
582 foreach(lc, baserel->joininfo)
584 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
586 /* Check if clause can be moved to this rel */
587 if (!join_clause_is_movable_to(rinfo, baserel))
590 /* See if it is safe to send to remote */
591 if (!is_foreign_expr(root, baserel, rinfo->clause))
595 * OK, get a cost estimate from the remote, and make a path.
597 join_quals = list_make1(rinfo);
598 estimate_path_cost_size(root, baserel, join_quals,
600 &startup_cost, &total_cost);
602 /* Must calculate required outer rels for this path */
603 required_outer = bms_union(rinfo->clause_relids,
604 baserel->lateral_relids);
605 /* We do not want the foreign rel itself listed in required_outer */
606 required_outer = bms_del_member(required_outer, baserel->relid);
607 /* Enforce convention that required_outer is exactly NULL if empty */
608 if (bms_is_empty(required_outer))
609 required_outer = NULL;
611 path = create_foreignscan_path(root, baserel,
615 NIL, /* no pathkeys */
617 NIL); /* no fdw_private list */
618 add_path(baserel, (Path *) path);
622 * The above scan examined only "generic" join clauses, not those that
623 * were absorbed into EquivalenceClauses. See if we can make anything out
624 * of EquivalenceClauses.
626 if (baserel->has_eclass_joins)
629 * We repeatedly scan the eclass list looking for column references
630 * (or expressions) belonging to the foreign rel. Each time we find
631 * one, we generate a list of equivalence joinclauses for it, and then
632 * try to make those into foreign paths. Repeat till there are no
633 * more candidate EC members.
635 ec_member_foreign_arg arg;
637 arg.already_used = NIL;
642 /* Make clauses, skipping any that join to lateral_referencers */
644 clauses = generate_implied_equalities_for_column(root,
646 ec_member_matches_foreign,
648 baserel->lateral_referencers);
650 /* Done if there are no more expressions in the foreign rel */
651 if (arg.current == NULL)
653 Assert(clauses == NIL);
657 /* Scan the extracted join clauses */
660 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
662 /* Check if clause can be moved to this rel */
663 if (!join_clause_is_movable_to(rinfo, baserel))
666 /* See if it is safe to send to remote */
667 if (!is_foreign_expr(root, baserel, rinfo->clause))
671 * OK, get a cost estimate from the remote, and make a path.
673 join_quals = list_make1(rinfo);
674 estimate_path_cost_size(root, baserel, join_quals,
676 &startup_cost, &total_cost);
678 /* Must calculate required outer rels for this path */
679 required_outer = bms_union(rinfo->clause_relids,
680 baserel->lateral_relids);
681 required_outer = bms_del_member(required_outer, baserel->relid);
682 if (bms_is_empty(required_outer))
683 required_outer = NULL;
685 path = create_foreignscan_path(root, baserel,
689 NIL, /* no pathkeys */
691 NIL); /* no fdw_private */
692 add_path(baserel, (Path *) path);
695 /* Try again, now ignoring the expression we found this time */
696 arg.already_used = lappend(arg.already_used, arg.current);
702 * postgresGetForeignPlan
703 * Create ForeignScan plan node which implements selected best path
706 postgresGetForeignPlan(PlannerInfo *root,
709 ForeignPath *best_path,
713 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
714 Index scan_relid = baserel->relid;
716 List *remote_conds = NIL;
717 List *local_exprs = NIL;
718 List *params_list = NIL;
719 List *retrieved_attrs;
724 * Separate the scan_clauses into those that can be executed remotely and
725 * those that can't. baserestrictinfo clauses that were previously
726 * determined to be safe or unsafe by classifyConditions are shown in
727 * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
728 * scan_clauses list should be a join clause that was found safe by
729 * postgresGetForeignPaths.
731 * Note: for clauses extracted from EquivalenceClasses, it's possible that
732 * what we get here is a different representation of the clause than what
733 * postgresGetForeignPaths saw; for example we might get a commuted
734 * version of the clause. So we can't insist on simple equality as we do
735 * for the baserestrictinfo clauses.
737 * This code must match "extract_actual_clauses(scan_clauses, false)"
738 * except for the additional decision about remote versus local execution.
739 * Note however that we only strip the RestrictInfo nodes from the
740 * local_exprs list, since appendWhereClause expects a list of
743 foreach(lc, scan_clauses)
745 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
747 Assert(IsA(rinfo, RestrictInfo));
749 /* Ignore any pseudoconstants, they're dealt with elsewhere */
750 if (rinfo->pseudoconstant)
753 if (list_member_ptr(fpinfo->remote_conds, rinfo))
754 remote_conds = lappend(remote_conds, rinfo);
755 else if (list_member_ptr(fpinfo->local_conds, rinfo))
756 local_exprs = lappend(local_exprs, rinfo->clause);
759 Assert(is_foreign_expr(root, baserel, rinfo->clause));
760 remote_conds = lappend(remote_conds, rinfo);
765 * Build the query string to be sent for execution, and identify
766 * expressions to be sent as parameters.
768 initStringInfo(&sql);
769 deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
772 appendWhereClause(&sql, root, baserel, remote_conds,
776 * Add FOR UPDATE/SHARE if appropriate. We apply locking during the
777 * initial row fetch, rather than later on as is done for local tables.
778 * The extra roundtrips involved in trying to duplicate the local
779 * semantics exactly don't seem worthwhile (see also comments for
782 * Note: because we actually run the query as a cursor, this assumes that
783 * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
785 if (baserel->relid == root->parse->resultRelation &&
786 (root->parse->commandType == CMD_UPDATE ||
787 root->parse->commandType == CMD_DELETE))
789 /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
790 appendStringInfoString(&sql, " FOR UPDATE");
794 RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
799 * Relation is specified as a FOR UPDATE/SHARE target, so handle
802 * For now, just ignore any [NO] KEY specification, since (a) it's
803 * not clear what that means for a remote table that we don't have
804 * complete information about, and (b) it wouldn't work anyway on
805 * older remote servers. Likewise, we don't worry about NOWAIT.
807 switch (rc->strength)
809 case LCS_FORKEYSHARE:
811 appendStringInfoString(&sql, " FOR SHARE");
813 case LCS_FORNOKEYUPDATE:
815 appendStringInfoString(&sql, " FOR UPDATE");
822 * Build the fdw_private list that will be available to the executor.
823 * Items in the list must match enum FdwScanPrivateIndex, above.
825 fdw_private = list_make2(makeString(sql.data),
829 * Create the ForeignScan node from target list, local filtering
830 * expressions, remote parameter expressions, and FDW private information.
832 * Note that the remote parameter expressions are stored in the fdw_exprs
833 * field of the finished plan node; we can't keep them in private state
834 * because then they wouldn't be subject to later planner processing.
836 return make_foreignscan(tlist,
844 * postgresBeginForeignScan
845 * Initiate an executor scan of a foreign PostgreSQL table.
848 postgresBeginForeignScan(ForeignScanState *node, int eflags)
850 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
851 EState *estate = node->ss.ps.state;
852 PgFdwScanState *fsstate;
856 ForeignServer *server;
863 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
865 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
869 * We'll save private state in node->fdw_state.
871 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
872 node->fdw_state = (void *) fsstate;
875 * Identify which user to do the remote access as. This should match what
876 * ExecCheckRTEPerms() does.
878 rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
879 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
881 /* Get info about foreign table. */
882 fsstate->rel = node->ss.ss_currentRelation;
883 table = GetForeignTable(RelationGetRelid(fsstate->rel));
884 server = GetForeignServer(table->serverid);
885 user = GetUserMapping(userid, server->serverid);
888 * Get connection to the foreign server. Connection manager will
889 * establish new connection if necessary.
891 fsstate->conn = GetConnection(server, user, false);
893 /* Assign a unique ID for my cursor */
894 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
895 fsstate->cursor_exists = false;
897 /* Get private info created by planner functions. */
898 fsstate->query = strVal(list_nth(fsplan->fdw_private,
899 FdwScanPrivateSelectSql));
900 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
901 FdwScanPrivateRetrievedAttrs);
903 /* Create contexts for batches of tuples and per-tuple temp workspace. */
904 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
905 "postgres_fdw tuple data",
906 ALLOCSET_DEFAULT_MINSIZE,
907 ALLOCSET_DEFAULT_INITSIZE,
908 ALLOCSET_DEFAULT_MAXSIZE);
909 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
910 "postgres_fdw temporary data",
911 ALLOCSET_SMALL_MINSIZE,
912 ALLOCSET_SMALL_INITSIZE,
913 ALLOCSET_SMALL_MAXSIZE);
915 /* Get info we'll need for input data conversion. */
916 fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
918 /* Prepare for output conversion of parameters used in remote query. */
919 numParams = list_length(fsplan->fdw_exprs);
920 fsstate->numParams = numParams;
921 fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
924 foreach(lc, fsplan->fdw_exprs)
926 Node *param_expr = (Node *) lfirst(lc);
930 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
931 fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
936 * Prepare remote-parameter expressions for evaluation. (Note: in
937 * practice, we expect that all these expressions will be just Params, so
938 * we could possibly do something more efficient than using the full
939 * expression-eval machinery for this. But probably there would be little
940 * benefit, and it'd require postgres_fdw to know more than is desirable
941 * about Param evaluation.)
943 fsstate->param_exprs = (List *)
944 ExecInitExpr((Expr *) fsplan->fdw_exprs,
948 * Allocate buffer for text form of query parameters, if any.
951 fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
953 fsstate->param_values = NULL;
957 * postgresIterateForeignScan
958 * Retrieve next row from the result set, or clear tuple slot to indicate
961 static TupleTableSlot *
962 postgresIterateForeignScan(ForeignScanState *node)
964 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
965 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
968 * If this is the first call after Begin or ReScan, we need to create the
969 * cursor on the remote side.
971 if (!fsstate->cursor_exists)
975 * Get some more tuples, if we've run out.
977 if (fsstate->next_tuple >= fsstate->num_tuples)
979 /* No point in another fetch if we already detected EOF, though. */
980 if (!fsstate->eof_reached)
981 fetch_more_data(node);
982 /* If we didn't get any tuples, must be end of data. */
983 if (fsstate->next_tuple >= fsstate->num_tuples)
984 return ExecClearTuple(slot);
988 * Return the next tuple.
990 ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
999 * postgresReScanForeignScan
1003 postgresReScanForeignScan(ForeignScanState *node)
1005 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1009 /* If we haven't created the cursor yet, nothing to do. */
1010 if (!fsstate->cursor_exists)
1014 * If any internal parameters affecting this node have changed, we'd
1015 * better destroy and recreate the cursor. Otherwise, rewinding it should
1016 * be good enough. If we've only fetched zero or one batch, we needn't
1017 * even rewind the cursor, just rescan what we have.
1019 if (node->ss.ps.chgParam != NULL)
1021 fsstate->cursor_exists = false;
1022 snprintf(sql, sizeof(sql), "CLOSE c%u",
1023 fsstate->cursor_number);
1025 else if (fsstate->fetch_ct_2 > 1)
1027 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1028 fsstate->cursor_number);
1032 /* Easy: just rescan what we already have in memory, if anything */
1033 fsstate->next_tuple = 0;
1038 * We don't use a PG_TRY block here, so be careful not to throw error
1039 * without releasing the PGresult.
1041 res = PQexec(fsstate->conn, sql);
1042 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1043 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1046 /* Now force a fresh FETCH. */
1047 fsstate->tuples = NULL;
1048 fsstate->num_tuples = 0;
1049 fsstate->next_tuple = 0;
1050 fsstate->fetch_ct_2 = 0;
1051 fsstate->eof_reached = false;
1055 * postgresEndForeignScan
1056 * Finish scanning foreign table and dispose objects used for this scan
1059 postgresEndForeignScan(ForeignScanState *node)
1061 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1063 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1064 if (fsstate == NULL)
1067 /* Close the cursor if open, to prevent accumulation of cursors */
1068 if (fsstate->cursor_exists)
1069 close_cursor(fsstate->conn, fsstate->cursor_number);
1071 /* Release remote connection */
1072 ReleaseConnection(fsstate->conn);
1073 fsstate->conn = NULL;
1075 /* MemoryContexts will be deleted automatically. */
1079 * postgresAddForeignUpdateTargets
1080 * Add resjunk column(s) needed for update/delete on a foreign table
1083 postgresAddForeignUpdateTargets(Query *parsetree,
1084 RangeTblEntry *target_rte,
1085 Relation target_relation)
1088 const char *attrname;
1092 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1095 /* Make a Var representing the desired value */
1096 var = makeVar(parsetree->resultRelation,
1097 SelfItemPointerAttributeNumber,
1103 /* Wrap it in a resjunk TLE with the right name ... */
1106 tle = makeTargetEntry((Expr *) var,
1107 list_length(parsetree->targetList) + 1,
1111 /* ... and add it to the query's targetlist */
1112 parsetree->targetList = lappend(parsetree->targetList, tle);
1116 * postgresPlanForeignModify
1117 * Plan an insert/update/delete operation on a foreign table
1119 * Note: currently, the plan tree generated for UPDATE/DELETE will always
1120 * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
1121 * and then the ModifyTable node will have to execute individual remote
1122 * UPDATE/DELETE commands. If there are no local conditions or joins
1123 * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
1124 * and then do nothing at ModifyTable. Room for future optimization ...
1127 postgresPlanForeignModify(PlannerInfo *root,
1129 Index resultRelation,
1132 CmdType operation = plan->operation;
1133 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1136 List *targetAttrs = NIL;
1137 List *returningList = NIL;
1138 List *retrieved_attrs = NIL;
1140 initStringInfo(&sql);
1143 * Core code already has some lock on each rel being planned, so we can
1146 rel = heap_open(rte->relid, NoLock);
1149 * In an INSERT, we transmit all columns that are defined in the foreign
1150 * table. In an UPDATE, we transmit only columns that were explicitly
1151 * targets of the UPDATE, so as to avoid unnecessary data transmission.
1152 * (We can't do that for INSERT since we would miss sending default values
1153 * for columns not listed in the source statement.)
1155 if (operation == CMD_INSERT)
1157 TupleDesc tupdesc = RelationGetDescr(rel);
1160 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1162 Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1164 if (!attr->attisdropped)
1165 targetAttrs = lappend_int(targetAttrs, attnum);
1168 else if (operation == CMD_UPDATE)
1170 Bitmapset *tmpset = bms_copy(rte->modifiedCols);
1173 while ((col = bms_first_member(tmpset)) >= 0)
1175 col += FirstLowInvalidHeapAttributeNumber;
1176 if (col <= InvalidAttrNumber) /* shouldn't happen */
1177 elog(ERROR, "system-column update is not supported");
1178 targetAttrs = lappend_int(targetAttrs, col);
1183 * Extract the relevant RETURNING list if any.
1185 if (plan->returningLists)
1186 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1189 * Construct the SQL command string.
1194 deparseInsertSql(&sql, root, resultRelation, rel,
1195 targetAttrs, returningList,
1199 deparseUpdateSql(&sql, root, resultRelation, rel,
1200 targetAttrs, returningList,
1204 deparseDeleteSql(&sql, root, resultRelation, rel,
1209 elog(ERROR, "unexpected operation: %d", (int) operation);
1213 heap_close(rel, NoLock);
1216 * Build the fdw_private list that will be available to the executor.
1217 * Items in the list must match enum FdwModifyPrivateIndex, above.
1219 return list_make4(makeString(sql.data),
1221 makeInteger((returningList != NIL)),
1226 * postgresBeginForeignModify
1227 * Begin an insert/update/delete operation on a foreign table
1230 postgresBeginForeignModify(ModifyTableState *mtstate,
1231 ResultRelInfo *resultRelInfo,
1236 PgFdwModifyState *fmstate;
1237 EState *estate = mtstate->ps.state;
1238 CmdType operation = mtstate->operation;
1239 Relation rel = resultRelInfo->ri_RelationDesc;
1242 ForeignTable *table;
1243 ForeignServer *server;
1245 AttrNumber n_params;
1251 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1254 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1257 /* Begin constructing PgFdwModifyState. */
1258 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1262 * Identify which user to do the remote access as. This should match what
1263 * ExecCheckRTEPerms() does.
1265 rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1266 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1268 /* Get info about foreign table. */
1269 table = GetForeignTable(RelationGetRelid(rel));
1270 server = GetForeignServer(table->serverid);
1271 user = GetUserMapping(userid, server->serverid);
1273 /* Open connection; report that we'll create a prepared statement. */
1274 fmstate->conn = GetConnection(server, user, true);
1275 fmstate->p_name = NULL; /* prepared statement not made yet */
1277 /* Deconstruct fdw_private data. */
1278 fmstate->query = strVal(list_nth(fdw_private,
1279 FdwModifyPrivateUpdateSql));
1280 fmstate->target_attrs = (List *) list_nth(fdw_private,
1281 FdwModifyPrivateTargetAttnums);
1282 fmstate->has_returning = intVal(list_nth(fdw_private,
1283 FdwModifyPrivateHasReturning));
1284 fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1285 FdwModifyPrivateRetrievedAttrs);
1287 /* Create context for per-tuple temp workspace. */
1288 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1289 "postgres_fdw temporary data",
1290 ALLOCSET_SMALL_MINSIZE,
1291 ALLOCSET_SMALL_INITSIZE,
1292 ALLOCSET_SMALL_MAXSIZE);
1294 /* Prepare for input conversion of RETURNING results. */
1295 if (fmstate->has_returning)
1296 fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
1298 /* Prepare for output conversion of parameters used in prepared stmt. */
1299 n_params = list_length(fmstate->target_attrs) + 1;
1300 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1301 fmstate->p_nums = 0;
1303 if (operation == CMD_UPDATE || operation == CMD_DELETE)
1305 /* Find the ctid resjunk column in the subplan's result */
1306 Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1308 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1310 if (!AttributeNumberIsValid(fmstate->ctidAttno))
1311 elog(ERROR, "could not find junk ctid column");
1313 /* First transmittable parameter will be ctid */
1314 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1315 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1319 if (operation == CMD_INSERT || operation == CMD_UPDATE)
1321 /* Set up for remaining transmittable parameters */
1322 foreach(lc, fmstate->target_attrs)
1324 int attnum = lfirst_int(lc);
1325 Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1327 Assert(!attr->attisdropped);
1329 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1330 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1335 Assert(fmstate->p_nums <= n_params);
1337 resultRelInfo->ri_FdwState = fmstate;
1341 * postgresExecForeignInsert
1342 * Insert one row into a foreign table
1344 static TupleTableSlot *
1345 postgresExecForeignInsert(EState *estate,
1346 ResultRelInfo *resultRelInfo,
1347 TupleTableSlot *slot,
1348 TupleTableSlot *planSlot)
1350 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1351 const char **p_values;
1355 /* Set up the prepared statement on the remote server, if we didn't yet */
1356 if (!fmstate->p_name)
1357 prepare_foreign_modify(fmstate);
1359 /* Convert parameters needed by prepared statement to text form */
1360 p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1363 * Execute the prepared statement, and check for success.
1365 * We don't use a PG_TRY block here, so be careful not to throw error
1366 * without releasing the PGresult.
1368 res = PQexecPrepared(fmstate->conn,
1375 if (PQresultStatus(res) !=
1376 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1377 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1379 /* Check number of rows affected, and fetch RETURNING tuple if any */
1380 if (fmstate->has_returning)
1382 n_rows = PQntuples(res);
1384 store_returning_result(fmstate, slot, res);
1387 n_rows = atoi(PQcmdTuples(res));
1392 MemoryContextReset(fmstate->temp_cxt);
1394 /* Return NULL if nothing was inserted on the remote end */
1395 return (n_rows > 0) ? slot : NULL;
1399 * postgresExecForeignUpdate
1400 * Update one row in a foreign table
1402 static TupleTableSlot *
1403 postgresExecForeignUpdate(EState *estate,
1404 ResultRelInfo *resultRelInfo,
1405 TupleTableSlot *slot,
1406 TupleTableSlot *planSlot)
1408 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1411 const char **p_values;
1415 /* Set up the prepared statement on the remote server, if we didn't yet */
1416 if (!fmstate->p_name)
1417 prepare_foreign_modify(fmstate);
1419 /* Get the ctid that was passed up as a resjunk column */
1420 datum = ExecGetJunkAttribute(planSlot,
1423 /* shouldn't ever get a null result... */
1425 elog(ERROR, "ctid is NULL");
1427 /* Convert parameters needed by prepared statement to text form */
1428 p_values = convert_prep_stmt_params(fmstate,
1429 (ItemPointer) DatumGetPointer(datum),
1433 * Execute the prepared statement, and check for success.
1435 * We don't use a PG_TRY block here, so be careful not to throw error
1436 * without releasing the PGresult.
1438 res = PQexecPrepared(fmstate->conn,
1445 if (PQresultStatus(res) !=
1446 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1447 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1449 /* Check number of rows affected, and fetch RETURNING tuple if any */
1450 if (fmstate->has_returning)
1452 n_rows = PQntuples(res);
1454 store_returning_result(fmstate, slot, res);
1457 n_rows = atoi(PQcmdTuples(res));
1462 MemoryContextReset(fmstate->temp_cxt);
1464 /* Return NULL if nothing was updated on the remote end */
1465 return (n_rows > 0) ? slot : NULL;
1469 * postgresExecForeignDelete
1470 * Delete one row from a foreign table
1472 static TupleTableSlot *
1473 postgresExecForeignDelete(EState *estate,
1474 ResultRelInfo *resultRelInfo,
1475 TupleTableSlot *slot,
1476 TupleTableSlot *planSlot)
1478 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1481 const char **p_values;
1485 /* Set up the prepared statement on the remote server, if we didn't yet */
1486 if (!fmstate->p_name)
1487 prepare_foreign_modify(fmstate);
1489 /* Get the ctid that was passed up as a resjunk column */
1490 datum = ExecGetJunkAttribute(planSlot,
1493 /* shouldn't ever get a null result... */
1495 elog(ERROR, "ctid is NULL");
1497 /* Convert parameters needed by prepared statement to text form */
1498 p_values = convert_prep_stmt_params(fmstate,
1499 (ItemPointer) DatumGetPointer(datum),
1503 * Execute the prepared statement, and check for success.
1505 * We don't use a PG_TRY block here, so be careful not to throw error
1506 * without releasing the PGresult.
1508 res = PQexecPrepared(fmstate->conn,
1515 if (PQresultStatus(res) !=
1516 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1517 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1519 /* Check number of rows affected, and fetch RETURNING tuple if any */
1520 if (fmstate->has_returning)
1522 n_rows = PQntuples(res);
1524 store_returning_result(fmstate, slot, res);
1527 n_rows = atoi(PQcmdTuples(res));
1532 MemoryContextReset(fmstate->temp_cxt);
1534 /* Return NULL if nothing was deleted on the remote end */
1535 return (n_rows > 0) ? slot : NULL;
1539 * postgresEndForeignModify
1540 * Finish an insert/update/delete operation on a foreign table
1543 postgresEndForeignModify(EState *estate,
1544 ResultRelInfo *resultRelInfo)
1546 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1548 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1549 if (fmstate == NULL)
1552 /* If we created a prepared statement, destroy it */
1553 if (fmstate->p_name)
1558 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
1561 * We don't use a PG_TRY block here, so be careful not to throw error
1562 * without releasing the PGresult.
1564 res = PQexec(fmstate->conn, sql);
1565 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1566 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
1568 fmstate->p_name = NULL;
1571 /* Release remote connection */
1572 ReleaseConnection(fmstate->conn);
1573 fmstate->conn = NULL;
1577 * postgresIsForeignRelUpdatable
1578 * Determine whether a foreign table supports INSERT, UPDATE and/or
1582 postgresIsForeignRelUpdatable(Relation rel)
1585 ForeignTable *table;
1586 ForeignServer *server;
1590 * By default, all postgres_fdw foreign tables are assumed updatable. This
1591 * can be overridden by a per-server setting, which in turn can be
1592 * overridden by a per-table setting.
1596 table = GetForeignTable(RelationGetRelid(rel));
1597 server = GetForeignServer(table->serverid);
1599 foreach(lc, server->options)
1601 DefElem *def = (DefElem *) lfirst(lc);
1603 if (strcmp(def->defname, "updatable") == 0)
1604 updatable = defGetBoolean(def);
1606 foreach(lc, table->options)
1608 DefElem *def = (DefElem *) lfirst(lc);
1610 if (strcmp(def->defname, "updatable") == 0)
1611 updatable = defGetBoolean(def);
1615 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
1618 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
1622 * postgresExplainForeignScan
1623 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
1626 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
1633 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
1634 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
1635 ExplainPropertyText("Remote SQL", sql, es);
1640 * postgresExplainForeignModify
1641 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
1644 postgresExplainForeignModify(ModifyTableState *mtstate,
1645 ResultRelInfo *rinfo,
1652 char *sql = strVal(list_nth(fdw_private,
1653 FdwModifyPrivateUpdateSql));
1655 ExplainPropertyText("Remote SQL", sql, es);
1661 * estimate_path_cost_size
1662 * Get cost and size estimates for a foreign scan
1664 * We assume that all the baserestrictinfo clauses will be applied, plus
1665 * any join clauses listed in join_conds.
1668 estimate_path_cost_size(PlannerInfo *root,
1669 RelOptInfo *baserel,
1671 double *p_rows, int *p_width,
1672 Cost *p_startup_cost, Cost *p_total_cost)
1674 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1676 double retrieved_rows;
1684 * If the table or the server is configured to use remote estimates,
1685 * connect to the foreign server and execute EXPLAIN to estimate the
1686 * number of rows selected by the restriction+join clauses. Otherwise,
1687 * estimate rows using whatever statistics we have locally, in a way
1688 * similar to ordinary tables.
1690 if (fpinfo->use_remote_estimate)
1693 List *retrieved_attrs;
1697 * Construct EXPLAIN query including the desired SELECT, FROM, and
1698 * WHERE clauses. Params and other-relation Vars are replaced by
1701 initStringInfo(&sql);
1702 appendStringInfoString(&sql, "EXPLAIN ");
1703 deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
1705 if (fpinfo->remote_conds)
1706 appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
1709 appendWhereClause(&sql, root, baserel, join_conds,
1710 (fpinfo->remote_conds == NIL), NULL);
1712 /* Get the remote estimate */
1713 conn = GetConnection(fpinfo->server, fpinfo->user, false);
1714 get_remote_estimate(sql.data, conn, &rows, &width,
1715 &startup_cost, &total_cost);
1716 ReleaseConnection(conn);
1718 retrieved_rows = rows;
1720 /* Factor in the selectivity of the local_conds */
1721 rows = clamp_row_est(rows * fpinfo->local_conds_sel);
1723 /* Add in the eval cost of the local_conds */
1724 startup_cost += fpinfo->local_conds_cost.startup;
1725 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
1730 * We don't support join conditions in this mode (hence, no
1731 * parameterized paths can be made).
1733 Assert(join_conds == NIL);
1735 /* Use rows/width estimates made by set_baserel_size_estimates. */
1736 rows = baserel->rows;
1737 width = baserel->width;
1740 * Back into an estimate of the number of retrieved rows. Just in
1741 * case this is nuts, clamp to at most baserel->tuples.
1743 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
1744 retrieved_rows = Min(retrieved_rows, baserel->tuples);
1747 * Cost as though this were a seqscan, which is pessimistic. We
1748 * effectively imagine the local_conds are being evaluated remotely,
1753 run_cost += seq_page_cost * baserel->pages;
1755 startup_cost += baserel->baserestrictcost.startup;
1756 cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
1757 run_cost += cpu_per_tuple * baserel->tuples;
1759 total_cost = startup_cost + run_cost;
1763 * Add some additional cost factors to account for connection overhead
1764 * (fdw_startup_cost), transferring data across the network
1765 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
1766 * (cpu_tuple_cost per retrieved row).
1768 startup_cost += fpinfo->fdw_startup_cost;
1769 total_cost += fpinfo->fdw_startup_cost;
1770 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
1771 total_cost += cpu_tuple_cost * retrieved_rows;
1773 /* Return results. */
1776 *p_startup_cost = startup_cost;
1777 *p_total_cost = total_cost;
1781 * Estimate costs of executing a SQL statement remotely.
1782 * The given "sql" must be an EXPLAIN command.
1785 get_remote_estimate(const char *sql, PGconn *conn,
1786 double *rows, int *width,
1787 Cost *startup_cost, Cost *total_cost)
1789 PGresult *volatile res = NULL;
1791 /* PGresult must be released before leaving this function. */
1799 * Execute EXPLAIN remotely.
1801 res = PQexec(conn, sql);
1802 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1803 pgfdw_report_error(ERROR, res, conn, false, sql);
1806 * Extract cost numbers for topmost plan node. Note we search for a
1807 * left paren from the end of the line to avoid being confused by
1808 * other uses of parentheses.
1810 line = PQgetvalue(res, 0, 0);
1811 p = strrchr(line, '(');
1813 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
1814 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
1815 startup_cost, total_cost, rows, width);
1817 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
1832 * Detect whether we want to process an EquivalenceClass member.
1834 * This is a callback for use by generate_implied_equalities_for_column.
1837 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
1838 EquivalenceClass *ec, EquivalenceMember *em,
1841 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
1842 Expr *expr = em->em_expr;
1845 * If we've identified what we're processing in the current scan, we only
1846 * want to match that expression.
1848 if (state->current != NULL)
1849 return equal(expr, state->current);
1852 * Otherwise, ignore anything we've already processed.
1854 if (list_member(state->already_used, expr))
1857 /* This is the new target to process. */
1858 state->current = expr;
1863 * Create cursor for node's query with current parameter values.
1866 create_cursor(ForeignScanState *node)
1868 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1869 ExprContext *econtext = node->ss.ps.ps_ExprContext;
1870 int numParams = fsstate->numParams;
1871 const char **values = fsstate->param_values;
1872 PGconn *conn = fsstate->conn;
1877 * Construct array of query parameter values in text format. We do the
1878 * conversions in the short-lived per-tuple context, so as not to cause a
1879 * memory leak over repeated scans.
1884 MemoryContext oldcontext;
1888 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1890 nestlevel = set_transmission_modes();
1893 foreach(lc, fsstate->param_exprs)
1895 ExprState *expr_state = (ExprState *) lfirst(lc);
1899 /* Evaluate the parameter expression */
1900 expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
1903 * Get string representation of each parameter value by invoking
1904 * type-specific output function, unless the value is null.
1909 values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
1914 reset_transmission_modes(nestlevel);
1916 MemoryContextSwitchTo(oldcontext);
1919 /* Construct the DECLARE CURSOR command */
1920 initStringInfo(&buf);
1921 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
1922 fsstate->cursor_number, fsstate->query);
1925 * Notice that we pass NULL for paramTypes, thus forcing the remote server
1926 * to infer types for all parameters. Since we explicitly cast every
1927 * parameter (see deparse.c), the "inference" is trivial and will produce
1928 * the desired result. This allows us to avoid assuming that the remote
1929 * server has the same OIDs we do for the parameters' types.
1931 * We don't use a PG_TRY block here, so be careful not to throw error
1932 * without releasing the PGresult.
1934 res = PQexecParams(conn, buf.data, numParams, NULL, values,
1936 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1937 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
1940 /* Mark the cursor as created, and show no tuples have been retrieved */
1941 fsstate->cursor_exists = true;
1942 fsstate->tuples = NULL;
1943 fsstate->num_tuples = 0;
1944 fsstate->next_tuple = 0;
1945 fsstate->fetch_ct_2 = 0;
1946 fsstate->eof_reached = false;
1953 * Fetch some more rows from the node's cursor.
1956 fetch_more_data(ForeignScanState *node)
1958 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1959 PGresult *volatile res = NULL;
1960 MemoryContext oldcontext;
1963 * We'll store the tuples in the batch_cxt. First, flush the previous
1966 fsstate->tuples = NULL;
1967 MemoryContextReset(fsstate->batch_cxt);
1968 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
1970 /* PGresult must be released before leaving this function. */
1973 PGconn *conn = fsstate->conn;
1979 /* The fetch size is arbitrary, but shouldn't be enormous. */
1982 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
1983 fetch_size, fsstate->cursor_number);
1985 res = PQexec(conn, sql);
1986 /* On error, report the original query, not the FETCH. */
1987 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1988 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
1990 /* Convert the data into HeapTuples */
1991 numrows = PQntuples(res);
1992 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
1993 fsstate->num_tuples = numrows;
1994 fsstate->next_tuple = 0;
1996 for (i = 0; i < numrows; i++)
1998 fsstate->tuples[i] =
1999 make_tuple_from_result_row(res, i,
2002 fsstate->retrieved_attrs,
2006 /* Update fetch_ct_2 */
2007 if (fsstate->fetch_ct_2 < 2)
2008 fsstate->fetch_ct_2++;
2010 /* Must be EOF if we didn't get as many tuples as we asked for. */
2011 fsstate->eof_reached = (numrows < fetch_size);
2024 MemoryContextSwitchTo(oldcontext);
2028 * Force assorted GUC parameters to settings that ensure that we'll output
2029 * data values in a form that is unambiguous to the remote server.
2031 * This is rather expensive and annoying to do once per row, but there's
2032 * little choice if we want to be sure values are transmitted accurately;
2033 * we can't leave the settings in place between rows for fear of affecting
2034 * user-visible computations.
2036 * We use the equivalent of a function SET option to allow the settings to
2037 * persist only until the caller calls reset_transmission_modes(). If an
2038 * error is thrown in between, guc.c will take care of undoing the settings.
2040 * The return value is the nestlevel that must be passed to
2041 * reset_transmission_modes() to undo things.
2044 set_transmission_modes(void)
2046 int nestlevel = NewGUCNestLevel();
2049 * The values set here should match what pg_dump does. See also
2050 * configure_remote_session in connection.c.
2052 if (DateStyle != USE_ISO_DATES)
2053 (void) set_config_option("datestyle", "ISO",
2054 PGC_USERSET, PGC_S_SESSION,
2055 GUC_ACTION_SAVE, true, 0);
2056 if (IntervalStyle != INTSTYLE_POSTGRES)
2057 (void) set_config_option("intervalstyle", "postgres",
2058 PGC_USERSET, PGC_S_SESSION,
2059 GUC_ACTION_SAVE, true, 0);
2060 if (extra_float_digits < 3)
2061 (void) set_config_option("extra_float_digits", "3",
2062 PGC_USERSET, PGC_S_SESSION,
2063 GUC_ACTION_SAVE, true, 0);
2069 * Undo the effects of set_transmission_modes().
2072 reset_transmission_modes(int nestlevel)
2074 AtEOXact_GUC(true, nestlevel);
2078 * Utility routine to close a cursor.
2081 close_cursor(PGconn *conn, unsigned int cursor_number)
2086 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
2089 * We don't use a PG_TRY block here, so be careful not to throw error
2090 * without releasing the PGresult.
2092 res = PQexec(conn, sql);
2093 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2094 pgfdw_report_error(ERROR, res, conn, true, sql);
2099 * prepare_foreign_modify
2100 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
2103 prepare_foreign_modify(PgFdwModifyState *fmstate)
2105 char prep_name[NAMEDATALEN];
2109 /* Construct name we'll use for the prepared statement. */
2110 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
2111 GetPrepStmtNumber(fmstate->conn));
2112 p_name = pstrdup(prep_name);
2115 * We intentionally do not specify parameter types here, but leave the
2116 * remote server to derive them by default. This avoids possible problems
2117 * with the remote server using different type OIDs than we do. All of
2118 * the prepared statements we use in this module are simple enough that
2119 * the remote server will make the right choices.
2121 * We don't use a PG_TRY block here, so be careful not to throw error
2122 * without releasing the PGresult.
2124 res = PQprepare(fmstate->conn,
2130 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2131 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
2134 /* This action shows that the prepare has been done. */
2135 fmstate->p_name = p_name;
2139 * convert_prep_stmt_params
2140 * Create array of text strings representing parameter values
2142 * tupleid is ctid to send, or NULL if none
2143 * slot is slot to get remaining parameters from, or NULL if none
2145 * Data is constructed in temp_cxt; caller should reset that after use.
2147 static const char **
2148 convert_prep_stmt_params(PgFdwModifyState *fmstate,
2149 ItemPointer tupleid,
2150 TupleTableSlot *slot)
2152 const char **p_values;
2154 MemoryContext oldcontext;
2156 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
2158 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
2160 /* 1st parameter should be ctid, if it's in use */
2161 if (tupleid != NULL)
2163 /* don't need set_transmission_modes for TID output */
2164 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
2165 PointerGetDatum(tupleid));
2169 /* get following parameters from slot */
2170 if (slot != NULL && fmstate->target_attrs != NIL)
2175 nestlevel = set_transmission_modes();
2177 foreach(lc, fmstate->target_attrs)
2179 int attnum = lfirst_int(lc);
2183 value = slot_getattr(slot, attnum, &isnull);
2185 p_values[pindex] = NULL;
2187 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
2192 reset_transmission_modes(nestlevel);
2195 Assert(pindex == fmstate->p_nums);
2197 MemoryContextSwitchTo(oldcontext);
2203 * store_returning_result
2204 * Store the result of a RETURNING clause
2206 * On error, be sure to release the PGresult on the way out. Callers do not
2207 * have PG_TRY blocks to ensure this happens.
2210 store_returning_result(PgFdwModifyState *fmstate,
2211 TupleTableSlot *slot, PGresult *res)
2213 /* PGresult must be released before leaving this function. */
2218 newtup = make_tuple_from_result_row(res, 0,
2221 fmstate->retrieved_attrs,
2223 /* tuple will be deleted when it is cleared from the slot */
2224 ExecStoreTuple(newtup, slot, InvalidBuffer, true);
2236 * postgresAnalyzeForeignTable
2237 * Test whether analyzing this foreign table is supported
2240 postgresAnalyzeForeignTable(Relation relation,
2241 AcquireSampleRowsFunc *func,
2242 BlockNumber *totalpages)
2244 ForeignTable *table;
2245 ForeignServer *server;
2249 PGresult *volatile res = NULL;
2251 /* Return the row-analysis function pointer */
2252 *func = postgresAcquireSampleRowsFunc;
2255 * Now we have to get the number of pages. It's annoying that the ANALYZE
2256 * API requires us to return that now, because it forces some duplication
2257 * of effort between this routine and postgresAcquireSampleRowsFunc. But
2258 * it's probably not worth redefining that API at this point.
2262 * Get the connection to use. We do the remote access as the table's
2263 * owner, even if the ANALYZE was started by some other user.
2265 table = GetForeignTable(RelationGetRelid(relation));
2266 server = GetForeignServer(table->serverid);
2267 user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
2268 conn = GetConnection(server, user, false);
2271 * Construct command to get page count for relation.
2273 initStringInfo(&sql);
2274 deparseAnalyzeSizeSql(&sql, relation);
2276 /* In what follows, do not risk leaking any PGresults. */
2279 res = PQexec(conn, sql.data);
2280 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2281 pgfdw_report_error(ERROR, res, conn, false, sql.data);
2283 if (PQntuples(res) != 1 || PQnfields(res) != 1)
2284 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
2285 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2298 ReleaseConnection(conn);
2304 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
2306 * We fetch the whole table from the remote side and pick out some sample rows.
2308 * Selected rows are returned in the caller-allocated array rows[],
2309 * which must have at least targrows entries.
2310 * The actual number of rows selected is returned as the function result.
2311 * We also count the total number of rows in the table and return it into
2312 * *totalrows. Note that *totaldeadrows is always set to 0.
2314 * Note that the returned list of rows is not always in order by physical
2315 * position in the table. Therefore, correlation estimates derived later
2316 * may be meaningless, but it's OK because we don't use the estimates
2317 * currently (the planner only pays attention to correlation for indexscans).
2320 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
2321 HeapTuple *rows, int targrows,
2323 double *totaldeadrows)
2325 PgFdwAnalyzeState astate;
2326 ForeignTable *table;
2327 ForeignServer *server;
2330 unsigned int cursor_number;
2332 PGresult *volatile res = NULL;
2334 /* Initialize workspace state */
2335 astate.rel = relation;
2336 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
2339 astate.targrows = targrows;
2341 astate.samplerows = 0;
2342 astate.rowstoskip = -1; /* -1 means not set yet */
2343 astate.rstate = anl_init_selection_state(targrows);
2345 /* Remember ANALYZE context, and create a per-tuple temp context */
2346 astate.anl_cxt = CurrentMemoryContext;
2347 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
2348 "postgres_fdw temporary data",
2349 ALLOCSET_SMALL_MINSIZE,
2350 ALLOCSET_SMALL_INITSIZE,
2351 ALLOCSET_SMALL_MAXSIZE);
2354 * Get the connection to use. We do the remote access as the table's
2355 * owner, even if the ANALYZE was started by some other user.
2357 table = GetForeignTable(RelationGetRelid(relation));
2358 server = GetForeignServer(table->serverid);
2359 user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
2360 conn = GetConnection(server, user, false);
2363 * Construct cursor that retrieves whole rows from remote.
2365 cursor_number = GetCursorNumber(conn);
2366 initStringInfo(&sql);
2367 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
2368 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
2370 /* In what follows, do not risk leaking any PGresults. */
2373 res = PQexec(conn, sql.data);
2374 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2375 pgfdw_report_error(ERROR, res, conn, false, sql.data);
2379 /* Retrieve and process rows a batch at a time. */
2387 /* Allow users to cancel long query */
2388 CHECK_FOR_INTERRUPTS();
2391 * XXX possible future improvement: if rowstoskip is large, we
2392 * could issue a MOVE rather than physically fetching the rows,
2393 * then just adjust rowstoskip and samplerows appropriately.
2396 /* The fetch size is arbitrary, but shouldn't be enormous. */
2399 /* Fetch some rows */
2400 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
2401 fetch_size, cursor_number);
2403 res = PQexec(conn, fetch_sql);
2404 /* On error, report the original query, not the FETCH. */
2405 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2406 pgfdw_report_error(ERROR, res, conn, false, sql.data);
2408 /* Process whatever we got. */
2409 numrows = PQntuples(res);
2410 for (i = 0; i < numrows; i++)
2411 analyze_row_processor(res, i, &astate);
2416 /* Must be EOF if we didn't get all the rows requested. */
2417 if (numrows < fetch_size)
2421 /* Close the cursor, just to be tidy. */
2422 close_cursor(conn, cursor_number);
2432 ReleaseConnection(conn);
2434 /* We assume that we have no dead tuple. */
2435 *totaldeadrows = 0.0;
2437 /* We've retrieved all living tuples from foreign server. */
2438 *totalrows = astate.samplerows;
2441 * Emit some interesting relation info
2444 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
2445 RelationGetRelationName(relation),
2446 astate.samplerows, astate.numrows)));
2448 return astate.numrows;
2452 * Collect sample rows from the result of query.
2453 * - Use all tuples in sample until target # of samples are collected.
2454 * - Subsequently, replace already-sampled tuples randomly.
2457 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
2459 int targrows = astate->targrows;
2460 int pos; /* array index to store tuple in */
2461 MemoryContext oldcontext;
2463 /* Always increment sample row counter. */
2464 astate->samplerows += 1;
2467 * Determine the slot where this sample row should be stored. Set pos to
2468 * negative value to indicate the row should be skipped.
2470 if (astate->numrows < targrows)
2472 /* First targrows rows are always included into the sample */
2473 pos = astate->numrows++;
2478 * Now we start replacing tuples in the sample until we reach the end
2479 * of the relation. Same algorithm as in acquire_sample_rows in
2480 * analyze.c; see Jeff Vitter's paper.
2482 if (astate->rowstoskip < 0)
2483 astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
2486 if (astate->rowstoskip <= 0)
2488 /* Choose a random reservoir element to replace. */
2489 pos = (int) (targrows * anl_random_fract());
2490 Assert(pos >= 0 && pos < targrows);
2491 heap_freetuple(astate->rows[pos]);
2495 /* Skip this tuple. */
2499 astate->rowstoskip -= 1;
2505 * Create sample tuple from current result row, and store it in the
2506 * position determined above. The tuple has to be created in anl_cxt.
2508 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
2510 astate->rows[pos] = make_tuple_from_result_row(res, row,
2513 astate->retrieved_attrs,
2516 MemoryContextSwitchTo(oldcontext);
2521 * Create a tuple from the specified row of the PGresult.
2523 * rel is the local representation of the foreign table, attinmeta is
2524 * conversion data for the rel's tupdesc, and retrieved_attrs is an
2525 * integer list of the table column numbers present in the PGresult.
2526 * temp_context is a working context that can be reset after each tuple.
2529 make_tuple_from_result_row(PGresult *res,
2532 AttInMetadata *attinmeta,
2533 List *retrieved_attrs,
2534 MemoryContext temp_context)
2537 TupleDesc tupdesc = RelationGetDescr(rel);
2540 ItemPointer ctid = NULL;
2541 ConversionLocation errpos;
2542 ErrorContextCallback errcallback;
2543 MemoryContext oldcontext;
2547 Assert(row < PQntuples(res));
2550 * Do the following work in a temp context that we reset after each tuple.
2551 * This cleans up not only the data we have direct access to, but any
2552 * cruft the I/O functions might leak.
2554 oldcontext = MemoryContextSwitchTo(temp_context);
2556 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
2557 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
2558 /* Initialize to nulls for any columns not present in result */
2559 memset(nulls, true, tupdesc->natts * sizeof(bool));
2562 * Set up and install callback to report where conversion error occurs.
2565 errpos.cur_attno = 0;
2566 errcallback.callback = conversion_error_callback;
2567 errcallback.arg = (void *) &errpos;
2568 errcallback.previous = error_context_stack;
2569 error_context_stack = &errcallback;
2572 * i indexes columns in the relation, j indexes columns in the PGresult.
2575 foreach(lc, retrieved_attrs)
2577 int i = lfirst_int(lc);
2580 /* fetch next column's textual value */
2581 if (PQgetisnull(res, row, j))
2584 valstr = PQgetvalue(res, row, j);
2586 /* convert value to internal representation */
2589 /* ordinary column */
2590 Assert(i <= tupdesc->natts);
2591 nulls[i - 1] = (valstr == NULL);
2592 /* Apply the input function even to nulls, to support domains */
2593 errpos.cur_attno = i;
2594 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
2596 attinmeta->attioparams[i - 1],
2597 attinmeta->atttypmods[i - 1]);
2598 errpos.cur_attno = 0;
2600 else if (i == SelfItemPointerAttributeNumber)
2602 /* ctid --- note we ignore any other system column in result */
2607 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
2608 ctid = (ItemPointer) DatumGetPointer(datum);
2615 /* Uninstall error context callback. */
2616 error_context_stack = errcallback.previous;
2619 * Check we got the expected number of columns. Note: j == 0 and
2620 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
2622 if (j > 0 && j != PQnfields(res))
2623 elog(ERROR, "remote query result does not match the foreign table");
2626 * Build the result tuple in caller's memory context.
2628 MemoryContextSwitchTo(oldcontext);
2630 tuple = heap_form_tuple(tupdesc, values, nulls);
2633 tuple->t_self = *ctid;
2636 MemoryContextReset(temp_context);
2642 * Callback function which is called when error occurs during column value
2643 * conversion. Print names of column and relation.
2646 conversion_error_callback(void *arg)
2648 ConversionLocation *errpos = (ConversionLocation *) arg;
2649 TupleDesc tupdesc = RelationGetDescr(errpos->rel);
2651 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
2652 errcontext("column \"%s\" of foreign table \"%s\"",
2653 NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
2654 RelationGetRelationName(errpos->rel));