]> granicus.if.org Git - postgresql/blob - contrib/postgres_fdw/postgres_fdw.c
Improve connection-failure error handling in contrib/postgres_fdw.
[postgresql] / contrib / postgres_fdw / postgres_fdw.c
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  *                Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include "postgres_fdw.h"
16
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
23 #include "funcapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/pathnode.h"
29 #include "optimizer/paths.h"
30 #include "optimizer/planmain.h"
31 #include "optimizer/prep.h"
32 #include "optimizer/restrictinfo.h"
33 #include "optimizer/var.h"
34 #include "parser/parsetree.h"
35 #include "utils/builtins.h"
36 #include "utils/guc.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
39
40
41 PG_MODULE_MAGIC;
42
43 /* Default CPU cost to start up a foreign query. */
44 #define DEFAULT_FDW_STARTUP_COST        100.0
45
46 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
47 #define DEFAULT_FDW_TUPLE_COST          0.01
48
49 /*
50  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
51  * foreign table.  This information is collected by postgresGetForeignRelSize.
52  */
53 typedef struct PgFdwRelationInfo
54 {
55         /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
56         List       *remote_conds;
57         List       *local_conds;
58
59         /* Bitmap of attr numbers we need to fetch from the remote server. */
60         Bitmapset  *attrs_used;
61
62         /* Cost and selectivity of local_conds. */
63         QualCost        local_conds_cost;
64         Selectivity local_conds_sel;
65
66         /* Estimated size and cost for a scan with baserestrictinfo quals. */
67         double          rows;
68         int                     width;
69         Cost            startup_cost;
70         Cost            total_cost;
71
72         /* Options extracted from catalogs. */
73         bool            use_remote_estimate;
74         Cost            fdw_startup_cost;
75         Cost            fdw_tuple_cost;
76
77         /* Cached catalog information. */
78         ForeignTable *table;
79         ForeignServer *server;
80         UserMapping *user;                      /* only set in use_remote_estimate mode */
81 } PgFdwRelationInfo;
82
83 /*
84  * Indexes of FDW-private information stored in fdw_private lists.
85  *
86  * We store various information in ForeignScan.fdw_private to pass it from
87  * planner to executor.  Currently we store:
88  *
89  * 1) SELECT statement text to be sent to the remote server
90  * 2) Integer list of attribute numbers retrieved by the SELECT
91  *
92  * These items are indexed with the enum FdwScanPrivateIndex, so an item
93  * can be fetched with list_nth().      For example, to get the SELECT statement:
94  *              sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
95  */
96 enum FdwScanPrivateIndex
97 {
98         /* SQL statement to execute remotely (as a String node) */
99         FdwScanPrivateSelectSql,
100         /* Integer list of attribute numbers retrieved by the SELECT */
101         FdwScanPrivateRetrievedAttrs
102 };
103
104 /*
105  * Similarly, this enum describes what's kept in the fdw_private list for
106  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
107  *
108  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
109  * 2) Integer list of target attribute numbers for INSERT/UPDATE
110  *        (NIL for a DELETE)
111  * 3) Boolean flag showing if there's a RETURNING clause
112  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
113  */
114 enum FdwModifyPrivateIndex
115 {
116         /* SQL statement to execute remotely (as a String node) */
117         FdwModifyPrivateUpdateSql,
118         /* Integer list of target attribute numbers for INSERT/UPDATE */
119         FdwModifyPrivateTargetAttnums,
120         /* has-returning flag (as an integer Value node) */
121         FdwModifyPrivateHasReturning,
122         /* Integer list of attribute numbers retrieved by RETURNING */
123         FdwModifyPrivateRetrievedAttrs
124 };
125
126 /*
127  * Execution state of a foreign scan using postgres_fdw.
128  */
129 typedef struct PgFdwScanState
130 {
131         Relation        rel;                    /* relcache entry for the foreign table */
132         AttInMetadata *attinmeta;       /* attribute datatype conversion metadata */
133
134         /* extracted fdw_private data */
135         char       *query;                      /* text of SELECT command */
136         List       *retrieved_attrs;    /* list of retrieved attribute numbers */
137
138         /* for remote query execution */
139         PGconn     *conn;                       /* connection for the scan */
140         unsigned int cursor_number; /* quasi-unique ID for my cursor */
141         bool            cursor_exists;  /* have we created the cursor? */
142         int                     numParams;              /* number of parameters passed to query */
143         FmgrInfo   *param_flinfo;       /* output conversion functions for them */
144         List       *param_exprs;        /* executable expressions for param values */
145         const char **param_values;      /* textual values of query parameters */
146
147         /* for storing result tuples */
148         HeapTuple  *tuples;                     /* array of currently-retrieved tuples */
149         int                     num_tuples;             /* # of tuples in array */
150         int                     next_tuple;             /* index of next one to return */
151
152         /* batch-level state, for optimizing rewinds and avoiding useless fetch */
153         int                     fetch_ct_2;             /* Min(# of fetches done, 2) */
154         bool            eof_reached;    /* true if last fetch reached EOF */
155
156         /* working memory contexts */
157         MemoryContext batch_cxt;        /* context holding current batch of tuples */
158         MemoryContext temp_cxt;         /* context for per-tuple temporary data */
159 } PgFdwScanState;
160
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166         Relation        rel;                    /* relcache entry for the foreign table */
167         AttInMetadata *attinmeta;       /* attribute datatype conversion metadata */
168
169         /* for remote query execution */
170         PGconn     *conn;                       /* connection for the scan */
171         char       *p_name;                     /* name of prepared statement, if created */
172
173         /* extracted fdw_private data */
174         char       *query;                      /* text of INSERT/UPDATE/DELETE command */
175         List       *target_attrs;       /* list of target attribute numbers */
176         bool            has_returning;  /* is there a RETURNING clause? */
177         List       *retrieved_attrs;    /* attr numbers retrieved by RETURNING */
178
179         /* info about parameters for prepared statement */
180         AttrNumber      ctidAttno;              /* attnum of input resjunk ctid column */
181         int                     p_nums;                 /* number of parameters to transmit */
182         FmgrInfo   *p_flinfo;           /* output conversion functions for them */
183
184         /* working memory context */
185         MemoryContext temp_cxt;         /* context for per-tuple temporary data */
186 } PgFdwModifyState;
187
188 /*
189  * Workspace for analyzing a foreign table.
190  */
191 typedef struct PgFdwAnalyzeState
192 {
193         Relation        rel;                    /* relcache entry for the foreign table */
194         AttInMetadata *attinmeta;       /* attribute datatype conversion metadata */
195         List       *retrieved_attrs;    /* attr numbers retrieved by query */
196
197         /* collected sample rows */
198         HeapTuple  *rows;                       /* array of size targrows */
199         int                     targrows;               /* target # of sample rows */
200         int                     numrows;                /* # of sample rows collected */
201
202         /* for random sampling */
203         double          samplerows;             /* # of rows fetched */
204         double          rowstoskip;             /* # of rows to skip before next sample */
205         double          rstate;                 /* random state */
206
207         /* working memory contexts */
208         MemoryContext anl_cxt;          /* context for per-analyze lifespan data */
209         MemoryContext temp_cxt;         /* context for per-tuple temporary data */
210 } PgFdwAnalyzeState;
211
212 /*
213  * Identify the attribute where data conversion fails.
214  */
215 typedef struct ConversionLocation
216 {
217         Relation        rel;                    /* foreign table's relcache entry */
218         AttrNumber      cur_attno;              /* attribute number being processed, or 0 */
219 } ConversionLocation;
220
221 /* Callback argument for ec_member_matches_foreign */
222 typedef struct
223 {
224         Expr       *current;            /* current expr, or NULL if not yet found */
225         List       *already_used;       /* expressions already dealt with */
226 } ec_member_foreign_arg;
227
228 /*
229  * SQL functions
230  */
231 extern Datum postgres_fdw_handler(PG_FUNCTION_ARGS);
232
233 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
234
235 /*
236  * FDW callback routines
237  */
238 static void postgresGetForeignRelSize(PlannerInfo *root,
239                                                   RelOptInfo *baserel,
240                                                   Oid foreigntableid);
241 static void postgresGetForeignPaths(PlannerInfo *root,
242                                                 RelOptInfo *baserel,
243                                                 Oid foreigntableid);
244 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
245                                            RelOptInfo *baserel,
246                                            Oid foreigntableid,
247                                            ForeignPath *best_path,
248                                            List *tlist,
249                                            List *scan_clauses);
250 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
251 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
252 static void postgresReScanForeignScan(ForeignScanState *node);
253 static void postgresEndForeignScan(ForeignScanState *node);
254 static void postgresAddForeignUpdateTargets(Query *parsetree,
255                                                                 RangeTblEntry *target_rte,
256                                                                 Relation target_relation);
257 static List *postgresPlanForeignModify(PlannerInfo *root,
258                                                   ModifyTable *plan,
259                                                   Index resultRelation,
260                                                   int subplan_index);
261 static void postgresBeginForeignModify(ModifyTableState *mtstate,
262                                                    ResultRelInfo *resultRelInfo,
263                                                    List *fdw_private,
264                                                    int subplan_index,
265                                                    int eflags);
266 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
267                                                   ResultRelInfo *resultRelInfo,
268                                                   TupleTableSlot *slot,
269                                                   TupleTableSlot *planSlot);
270 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
271                                                   ResultRelInfo *resultRelInfo,
272                                                   TupleTableSlot *slot,
273                                                   TupleTableSlot *planSlot);
274 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
275                                                   ResultRelInfo *resultRelInfo,
276                                                   TupleTableSlot *slot,
277                                                   TupleTableSlot *planSlot);
278 static void postgresEndForeignModify(EState *estate,
279                                                  ResultRelInfo *resultRelInfo);
280 static int      postgresIsForeignRelUpdatable(Relation rel);
281 static void postgresExplainForeignScan(ForeignScanState *node,
282                                                    ExplainState *es);
283 static void postgresExplainForeignModify(ModifyTableState *mtstate,
284                                                          ResultRelInfo *rinfo,
285                                                          List *fdw_private,
286                                                          int subplan_index,
287                                                          ExplainState *es);
288 static bool postgresAnalyzeForeignTable(Relation relation,
289                                                         AcquireSampleRowsFunc *func,
290                                                         BlockNumber *totalpages);
291
292 /*
293  * Helper functions
294  */
295 static void estimate_path_cost_size(PlannerInfo *root,
296                                                 RelOptInfo *baserel,
297                                                 List *join_conds,
298                                                 double *p_rows, int *p_width,
299                                                 Cost *p_startup_cost, Cost *p_total_cost);
300 static void get_remote_estimate(const char *sql,
301                                         PGconn *conn,
302                                         double *rows,
303                                         int *width,
304                                         Cost *startup_cost,
305                                         Cost *total_cost);
306 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
307                                                   EquivalenceClass *ec, EquivalenceMember *em,
308                                                   void *arg);
309 static void create_cursor(ForeignScanState *node);
310 static void fetch_more_data(ForeignScanState *node);
311 static void close_cursor(PGconn *conn, unsigned int cursor_number);
312 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
313 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
314                                                  ItemPointer tupleid,
315                                                  TupleTableSlot *slot);
316 static void store_returning_result(PgFdwModifyState *fmstate,
317                                            TupleTableSlot *slot, PGresult *res);
318 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
319                                                           HeapTuple *rows, int targrows,
320                                                           double *totalrows,
321                                                           double *totaldeadrows);
322 static void analyze_row_processor(PGresult *res, int row,
323                                           PgFdwAnalyzeState *astate);
324 static HeapTuple make_tuple_from_result_row(PGresult *res,
325                                                    int row,
326                                                    Relation rel,
327                                                    AttInMetadata *attinmeta,
328                                                    List *retrieved_attrs,
329                                                    MemoryContext temp_context);
330 static void conversion_error_callback(void *arg);
331
332
333 /*
334  * Foreign-data wrapper handler function: return a struct with pointers
335  * to my callback routines.
336  */
337 Datum
338 postgres_fdw_handler(PG_FUNCTION_ARGS)
339 {
340         FdwRoutine *routine = makeNode(FdwRoutine);
341
342         /* Functions for scanning foreign tables */
343         routine->GetForeignRelSize = postgresGetForeignRelSize;
344         routine->GetForeignPaths = postgresGetForeignPaths;
345         routine->GetForeignPlan = postgresGetForeignPlan;
346         routine->BeginForeignScan = postgresBeginForeignScan;
347         routine->IterateForeignScan = postgresIterateForeignScan;
348         routine->ReScanForeignScan = postgresReScanForeignScan;
349         routine->EndForeignScan = postgresEndForeignScan;
350
351         /* Functions for updating foreign tables */
352         routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
353         routine->PlanForeignModify = postgresPlanForeignModify;
354         routine->BeginForeignModify = postgresBeginForeignModify;
355         routine->ExecForeignInsert = postgresExecForeignInsert;
356         routine->ExecForeignUpdate = postgresExecForeignUpdate;
357         routine->ExecForeignDelete = postgresExecForeignDelete;
358         routine->EndForeignModify = postgresEndForeignModify;
359         routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
360
361         /* Support functions for EXPLAIN */
362         routine->ExplainForeignScan = postgresExplainForeignScan;
363         routine->ExplainForeignModify = postgresExplainForeignModify;
364
365         /* Support functions for ANALYZE */
366         routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
367
368         PG_RETURN_POINTER(routine);
369 }
370
371 /*
372  * postgresGetForeignRelSize
373  *              Estimate # of rows and width of the result of the scan
374  *
375  * We should consider the effect of all baserestrictinfo clauses here, but
376  * not any join clauses.
377  */
378 static void
379 postgresGetForeignRelSize(PlannerInfo *root,
380                                                   RelOptInfo *baserel,
381                                                   Oid foreigntableid)
382 {
383         PgFdwRelationInfo *fpinfo;
384         ListCell   *lc;
385
386         /*
387          * We use PgFdwRelationInfo to pass various information to subsequent
388          * functions.
389          */
390         fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
391         baserel->fdw_private = (void *) fpinfo;
392
393         /* Look up foreign-table catalog info. */
394         fpinfo->table = GetForeignTable(foreigntableid);
395         fpinfo->server = GetForeignServer(fpinfo->table->serverid);
396
397         /*
398          * Extract user-settable option values.  Note that per-table setting of
399          * use_remote_estimate overrides per-server setting.
400          */
401         fpinfo->use_remote_estimate = false;
402         fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
403         fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
404
405         foreach(lc, fpinfo->server->options)
406         {
407                 DefElem    *def = (DefElem *) lfirst(lc);
408
409                 if (strcmp(def->defname, "use_remote_estimate") == 0)
410                         fpinfo->use_remote_estimate = defGetBoolean(def);
411                 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
412                         fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
413                 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
414                         fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
415         }
416         foreach(lc, fpinfo->table->options)
417         {
418                 DefElem    *def = (DefElem *) lfirst(lc);
419
420                 if (strcmp(def->defname, "use_remote_estimate") == 0)
421                 {
422                         fpinfo->use_remote_estimate = defGetBoolean(def);
423                         break;                          /* only need the one value */
424                 }
425         }
426
427         /*
428          * If the table or the server is configured to use remote estimates,
429          * identify which user to do remote access as during planning.  This
430          * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
431          * permissions, the query would have failed at runtime anyway.
432          */
433         if (fpinfo->use_remote_estimate)
434         {
435                 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
436                 Oid                     userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
437
438                 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
439         }
440         else
441                 fpinfo->user = NULL;
442
443         /*
444          * Identify which baserestrictinfo clauses can be sent to the remote
445          * server and which can't.
446          */
447         classifyConditions(root, baserel,
448                                            &fpinfo->remote_conds, &fpinfo->local_conds);
449
450         /*
451          * Identify which attributes will need to be retrieved from the remote
452          * server.      These include all attrs needed for joins or final output, plus
453          * all attrs used in the local_conds.  (Note: if we end up using a
454          * parameterized scan, it's possible that some of the join clauses will be
455          * sent to the remote and thus we wouldn't really need to retrieve the
456          * columns used in them.  Doesn't seem worth detecting that case though.)
457          */
458         fpinfo->attrs_used = NULL;
459         pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
460                                    &fpinfo->attrs_used);
461         foreach(lc, fpinfo->local_conds)
462         {
463                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
464
465                 pull_varattnos((Node *) rinfo->clause, baserel->relid,
466                                            &fpinfo->attrs_used);
467         }
468
469         /*
470          * Compute the selectivity and cost of the local_conds, so we don't have
471          * to do it over again for each path.  The best we can do for these
472          * conditions is to estimate selectivity on the basis of local statistics.
473          */
474         fpinfo->local_conds_sel = clauselist_selectivity(root,
475                                                                                                          fpinfo->local_conds,
476                                                                                                          baserel->relid,
477                                                                                                          JOIN_INNER,
478                                                                                                          NULL);
479
480         cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
481
482         /*
483          * If the table or the server is configured to use remote estimates,
484          * connect to the foreign server and execute EXPLAIN to estimate the
485          * number of rows selected by the restriction clauses, as well as the
486          * average row width.  Otherwise, estimate using whatever statistics we
487          * have locally, in a way similar to ordinary tables.
488          */
489         if (fpinfo->use_remote_estimate)
490         {
491                 /*
492                  * Get cost/size estimates with help of remote server.  Save the
493                  * values in fpinfo so we don't need to do it again to generate the
494                  * basic foreign path.
495                  */
496                 estimate_path_cost_size(root, baserel, NIL,
497                                                                 &fpinfo->rows, &fpinfo->width,
498                                                                 &fpinfo->startup_cost, &fpinfo->total_cost);
499
500                 /* Report estimated baserel size to planner. */
501                 baserel->rows = fpinfo->rows;
502                 baserel->width = fpinfo->width;
503         }
504         else
505         {
506                 /*
507                  * If the foreign table has never been ANALYZEd, it will have relpages
508                  * and reltuples equal to zero, which most likely has nothing to do
509                  * with reality.  We can't do a whole lot about that if we're not
510                  * allowed to consult the remote server, but we can use a hack similar
511                  * to plancat.c's treatment of empty relations: use a minimum size
512                  * estimate of 10 pages, and divide by the column-datatype-based width
513                  * estimate to get the corresponding number of tuples.
514                  */
515                 if (baserel->pages == 0 && baserel->tuples == 0)
516                 {
517                         baserel->pages = 10;
518                         baserel->tuples =
519                                 (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
520                 }
521
522                 /* Estimate baserel size as best we can with local statistics. */
523                 set_baserel_size_estimates(root, baserel);
524
525                 /* Fill in basically-bogus cost estimates for use later. */
526                 estimate_path_cost_size(root, baserel, NIL,
527                                                                 &fpinfo->rows, &fpinfo->width,
528                                                                 &fpinfo->startup_cost, &fpinfo->total_cost);
529         }
530 }
531
532 /*
533  * postgresGetForeignPaths
534  *              Create possible scan paths for a scan on the foreign table
535  */
536 static void
537 postgresGetForeignPaths(PlannerInfo *root,
538                                                 RelOptInfo *baserel,
539                                                 Oid foreigntableid)
540 {
541         PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
542         ForeignPath *path;
543         List       *join_quals;
544         Relids          required_outer;
545         double          rows;
546         int                     width;
547         Cost            startup_cost;
548         Cost            total_cost;
549         ListCell   *lc;
550
551         /*
552          * Create simplest ForeignScan path node and add it to baserel.  This path
553          * corresponds to SeqScan path of regular tables (though depending on what
554          * baserestrict conditions we were able to send to remote, there might
555          * actually be an indexscan happening there).  We already did all the work
556          * to estimate cost and size of this path.
557          */
558         path = create_foreignscan_path(root, baserel,
559                                                                    fpinfo->rows,
560                                                                    fpinfo->startup_cost,
561                                                                    fpinfo->total_cost,
562                                                                    NIL, /* no pathkeys */
563                                                                    NULL,                /* no outer rel either */
564                                                                    NIL);                /* no fdw_private list */
565         add_path(baserel, (Path *) path);
566
567         /*
568          * If we're not using remote estimates, stop here.  We have no way to
569          * estimate whether any join clauses would be worth sending across, so
570          * don't bother building parameterized paths.
571          */
572         if (!fpinfo->use_remote_estimate)
573                 return;
574
575         /*
576          * As a crude first hack, we consider each available join clause and try
577          * to make a parameterized path using just that clause.  Later we should
578          * consider combinations of clauses, probably.
579          */
580
581         /* Scan the rel's join clauses */
582         foreach(lc, baserel->joininfo)
583         {
584                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
585
586                 /* Check if clause can be moved to this rel */
587                 if (!join_clause_is_movable_to(rinfo, baserel))
588                         continue;
589
590                 /* See if it is safe to send to remote */
591                 if (!is_foreign_expr(root, baserel, rinfo->clause))
592                         continue;
593
594                 /*
595                  * OK, get a cost estimate from the remote, and make a path.
596                  */
597                 join_quals = list_make1(rinfo);
598                 estimate_path_cost_size(root, baserel, join_quals,
599                                                                 &rows, &width,
600                                                                 &startup_cost, &total_cost);
601
602                 /* Must calculate required outer rels for this path */
603                 required_outer = bms_union(rinfo->clause_relids,
604                                                                    baserel->lateral_relids);
605                 /* We do not want the foreign rel itself listed in required_outer */
606                 required_outer = bms_del_member(required_outer, baserel->relid);
607                 /* Enforce convention that required_outer is exactly NULL if empty */
608                 if (bms_is_empty(required_outer))
609                         required_outer = NULL;
610
611                 path = create_foreignscan_path(root, baserel,
612                                                                            rows,
613                                                                            startup_cost,
614                                                                            total_cost,
615                                                                            NIL,         /* no pathkeys */
616                                                                            required_outer,
617                                                                            NIL);        /* no fdw_private list */
618                 add_path(baserel, (Path *) path);
619         }
620
621         /*
622          * The above scan examined only "generic" join clauses, not those that
623          * were absorbed into EquivalenceClauses.  See if we can make anything out
624          * of EquivalenceClauses.
625          */
626         if (baserel->has_eclass_joins)
627         {
628                 /*
629                  * We repeatedly scan the eclass list looking for column references
630                  * (or expressions) belonging to the foreign rel.  Each time we find
631                  * one, we generate a list of equivalence joinclauses for it, and then
632                  * try to make those into foreign paths.  Repeat till there are no
633                  * more candidate EC members.
634                  */
635                 ec_member_foreign_arg arg;
636
637                 arg.already_used = NIL;
638                 for (;;)
639                 {
640                         List       *clauses;
641
642                         /* Make clauses, skipping any that join to lateral_referencers */
643                         arg.current = NULL;
644                         clauses = generate_implied_equalities_for_column(root,
645                                                                                                                          baserel,
646                                                                                                    ec_member_matches_foreign,
647                                                                                                                          (void *) &arg,
648                                                                                            baserel->lateral_referencers);
649
650                         /* Done if there are no more expressions in the foreign rel */
651                         if (arg.current == NULL)
652                         {
653                                 Assert(clauses == NIL);
654                                 break;
655                         }
656
657                         /* Scan the extracted join clauses */
658                         foreach(lc, clauses)
659                         {
660                                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
661
662                                 /* Check if clause can be moved to this rel */
663                                 if (!join_clause_is_movable_to(rinfo, baserel))
664                                         continue;
665
666                                 /* See if it is safe to send to remote */
667                                 if (!is_foreign_expr(root, baserel, rinfo->clause))
668                                         continue;
669
670                                 /*
671                                  * OK, get a cost estimate from the remote, and make a path.
672                                  */
673                                 join_quals = list_make1(rinfo);
674                                 estimate_path_cost_size(root, baserel, join_quals,
675                                                                                 &rows, &width,
676                                                                                 &startup_cost, &total_cost);
677
678                                 /* Must calculate required outer rels for this path */
679                                 required_outer = bms_union(rinfo->clause_relids,
680                                                                                    baserel->lateral_relids);
681                                 required_outer = bms_del_member(required_outer, baserel->relid);
682                                 if (bms_is_empty(required_outer))
683                                         required_outer = NULL;
684
685                                 path = create_foreignscan_path(root, baserel,
686                                                                                            rows,
687                                                                                            startup_cost,
688                                                                                            total_cost,
689                                                                                            NIL,         /* no pathkeys */
690                                                                                            required_outer,
691                                                                                            NIL);        /* no fdw_private */
692                                 add_path(baserel, (Path *) path);
693                         }
694
695                         /* Try again, now ignoring the expression we found this time */
696                         arg.already_used = lappend(arg.already_used, arg.current);
697                 }
698         }
699 }
700
701 /*
702  * postgresGetForeignPlan
703  *              Create ForeignScan plan node which implements selected best path
704  */
705 static ForeignScan *
706 postgresGetForeignPlan(PlannerInfo *root,
707                                            RelOptInfo *baserel,
708                                            Oid foreigntableid,
709                                            ForeignPath *best_path,
710                                            List *tlist,
711                                            List *scan_clauses)
712 {
713         PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
714         Index           scan_relid = baserel->relid;
715         List       *fdw_private;
716         List       *remote_conds = NIL;
717         List       *local_exprs = NIL;
718         List       *params_list = NIL;
719         List       *retrieved_attrs;
720         StringInfoData sql;
721         ListCell   *lc;
722
723         /*
724          * Separate the scan_clauses into those that can be executed remotely and
725          * those that can't.  baserestrictinfo clauses that were previously
726          * determined to be safe or unsafe by classifyConditions are shown in
727          * fpinfo->remote_conds and fpinfo->local_conds.  Anything else in the
728          * scan_clauses list should be a join clause that was found safe by
729          * postgresGetForeignPaths.
730          *
731          * Note: for clauses extracted from EquivalenceClasses, it's possible that
732          * what we get here is a different representation of the clause than what
733          * postgresGetForeignPaths saw; for example we might get a commuted
734          * version of the clause.  So we can't insist on simple equality as we do
735          * for the baserestrictinfo clauses.
736          *
737          * This code must match "extract_actual_clauses(scan_clauses, false)"
738          * except for the additional decision about remote versus local execution.
739          * Note however that we only strip the RestrictInfo nodes from the
740          * local_exprs list, since appendWhereClause expects a list of
741          * RestrictInfos.
742          */
743         foreach(lc, scan_clauses)
744         {
745                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
746
747                 Assert(IsA(rinfo, RestrictInfo));
748
749                 /* Ignore any pseudoconstants, they're dealt with elsewhere */
750                 if (rinfo->pseudoconstant)
751                         continue;
752
753                 if (list_member_ptr(fpinfo->remote_conds, rinfo))
754                         remote_conds = lappend(remote_conds, rinfo);
755                 else if (list_member_ptr(fpinfo->local_conds, rinfo))
756                         local_exprs = lappend(local_exprs, rinfo->clause);
757                 else
758                 {
759                         Assert(is_foreign_expr(root, baserel, rinfo->clause));
760                         remote_conds = lappend(remote_conds, rinfo);
761                 }
762         }
763
764         /*
765          * Build the query string to be sent for execution, and identify
766          * expressions to be sent as parameters.
767          */
768         initStringInfo(&sql);
769         deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
770                                          &retrieved_attrs);
771         if (remote_conds)
772                 appendWhereClause(&sql, root, baserel, remote_conds,
773                                                   true, &params_list);
774
775         /*
776          * Add FOR UPDATE/SHARE if appropriate.  We apply locking during the
777          * initial row fetch, rather than later on as is done for local tables.
778          * The extra roundtrips involved in trying to duplicate the local
779          * semantics exactly don't seem worthwhile (see also comments for
780          * RowMarkType).
781          *
782          * Note: because we actually run the query as a cursor, this assumes that
783          * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
784          */
785         if (baserel->relid == root->parse->resultRelation &&
786                 (root->parse->commandType == CMD_UPDATE ||
787                  root->parse->commandType == CMD_DELETE))
788         {
789                 /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
790                 appendStringInfoString(&sql, " FOR UPDATE");
791         }
792         else
793         {
794                 RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
795
796                 if (rc)
797                 {
798                         /*
799                          * Relation is specified as a FOR UPDATE/SHARE target, so handle
800                          * that.
801                          *
802                          * For now, just ignore any [NO] KEY specification, since (a) it's
803                          * not clear what that means for a remote table that we don't have
804                          * complete information about, and (b) it wouldn't work anyway on
805                          * older remote servers.  Likewise, we don't worry about NOWAIT.
806                          */
807                         switch (rc->strength)
808                         {
809                                 case LCS_FORKEYSHARE:
810                                 case LCS_FORSHARE:
811                                         appendStringInfoString(&sql, " FOR SHARE");
812                                         break;
813                                 case LCS_FORNOKEYUPDATE:
814                                 case LCS_FORUPDATE:
815                                         appendStringInfoString(&sql, " FOR UPDATE");
816                                         break;
817                         }
818                 }
819         }
820
821         /*
822          * Build the fdw_private list that will be available to the executor.
823          * Items in the list must match enum FdwScanPrivateIndex, above.
824          */
825         fdw_private = list_make2(makeString(sql.data),
826                                                          retrieved_attrs);
827
828         /*
829          * Create the ForeignScan node from target list, local filtering
830          * expressions, remote parameter expressions, and FDW private information.
831          *
832          * Note that the remote parameter expressions are stored in the fdw_exprs
833          * field of the finished plan node; we can't keep them in private state
834          * because then they wouldn't be subject to later planner processing.
835          */
836         return make_foreignscan(tlist,
837                                                         local_exprs,
838                                                         scan_relid,
839                                                         params_list,
840                                                         fdw_private);
841 }
842
843 /*
844  * postgresBeginForeignScan
845  *              Initiate an executor scan of a foreign PostgreSQL table.
846  */
847 static void
848 postgresBeginForeignScan(ForeignScanState *node, int eflags)
849 {
850         ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
851         EState     *estate = node->ss.ps.state;
852         PgFdwScanState *fsstate;
853         RangeTblEntry *rte;
854         Oid                     userid;
855         ForeignTable *table;
856         ForeignServer *server;
857         UserMapping *user;
858         int                     numParams;
859         int                     i;
860         ListCell   *lc;
861
862         /*
863          * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
864          */
865         if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
866                 return;
867
868         /*
869          * We'll save private state in node->fdw_state.
870          */
871         fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
872         node->fdw_state = (void *) fsstate;
873
874         /*
875          * Identify which user to do the remote access as.      This should match what
876          * ExecCheckRTEPerms() does.
877          */
878         rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
879         userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
880
881         /* Get info about foreign table. */
882         fsstate->rel = node->ss.ss_currentRelation;
883         table = GetForeignTable(RelationGetRelid(fsstate->rel));
884         server = GetForeignServer(table->serverid);
885         user = GetUserMapping(userid, server->serverid);
886
887         /*
888          * Get connection to the foreign server.  Connection manager will
889          * establish new connection if necessary.
890          */
891         fsstate->conn = GetConnection(server, user, false);
892
893         /* Assign a unique ID for my cursor */
894         fsstate->cursor_number = GetCursorNumber(fsstate->conn);
895         fsstate->cursor_exists = false;
896
897         /* Get private info created by planner functions. */
898         fsstate->query = strVal(list_nth(fsplan->fdw_private,
899                                                                          FdwScanPrivateSelectSql));
900         fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
901                                                                                            FdwScanPrivateRetrievedAttrs);
902
903         /* Create contexts for batches of tuples and per-tuple temp workspace. */
904         fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
905                                                                                            "postgres_fdw tuple data",
906                                                                                            ALLOCSET_DEFAULT_MINSIZE,
907                                                                                            ALLOCSET_DEFAULT_INITSIZE,
908                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
909         fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
910                                                                                           "postgres_fdw temporary data",
911                                                                                           ALLOCSET_SMALL_MINSIZE,
912                                                                                           ALLOCSET_SMALL_INITSIZE,
913                                                                                           ALLOCSET_SMALL_MAXSIZE);
914
915         /* Get info we'll need for input data conversion. */
916         fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
917
918         /* Prepare for output conversion of parameters used in remote query. */
919         numParams = list_length(fsplan->fdw_exprs);
920         fsstate->numParams = numParams;
921         fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
922
923         i = 0;
924         foreach(lc, fsplan->fdw_exprs)
925         {
926                 Node       *param_expr = (Node *) lfirst(lc);
927                 Oid                     typefnoid;
928                 bool            isvarlena;
929
930                 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
931                 fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
932                 i++;
933         }
934
935         /*
936          * Prepare remote-parameter expressions for evaluation.  (Note: in
937          * practice, we expect that all these expressions will be just Params, so
938          * we could possibly do something more efficient than using the full
939          * expression-eval machinery for this.  But probably there would be little
940          * benefit, and it'd require postgres_fdw to know more than is desirable
941          * about Param evaluation.)
942          */
943         fsstate->param_exprs = (List *)
944                 ExecInitExpr((Expr *) fsplan->fdw_exprs,
945                                          (PlanState *) node);
946
947         /*
948          * Allocate buffer for text form of query parameters, if any.
949          */
950         if (numParams > 0)
951                 fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
952         else
953                 fsstate->param_values = NULL;
954 }
955
956 /*
957  * postgresIterateForeignScan
958  *              Retrieve next row from the result set, or clear tuple slot to indicate
959  *              EOF.
960  */
961 static TupleTableSlot *
962 postgresIterateForeignScan(ForeignScanState *node)
963 {
964         PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
965         TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
966
967         /*
968          * If this is the first call after Begin or ReScan, we need to create the
969          * cursor on the remote side.
970          */
971         if (!fsstate->cursor_exists)
972                 create_cursor(node);
973
974         /*
975          * Get some more tuples, if we've run out.
976          */
977         if (fsstate->next_tuple >= fsstate->num_tuples)
978         {
979                 /* No point in another fetch if we already detected EOF, though. */
980                 if (!fsstate->eof_reached)
981                         fetch_more_data(node);
982                 /* If we didn't get any tuples, must be end of data. */
983                 if (fsstate->next_tuple >= fsstate->num_tuples)
984                         return ExecClearTuple(slot);
985         }
986
987         /*
988          * Return the next tuple.
989          */
990         ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
991                                    slot,
992                                    InvalidBuffer,
993                                    false);
994
995         return slot;
996 }
997
998 /*
999  * postgresReScanForeignScan
1000  *              Restart the scan.
1001  */
1002 static void
1003 postgresReScanForeignScan(ForeignScanState *node)
1004 {
1005         PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1006         char            sql[64];
1007         PGresult   *res;
1008
1009         /* If we haven't created the cursor yet, nothing to do. */
1010         if (!fsstate->cursor_exists)
1011                 return;
1012
1013         /*
1014          * If any internal parameters affecting this node have changed, we'd
1015          * better destroy and recreate the cursor.      Otherwise, rewinding it should
1016          * be good enough.      If we've only fetched zero or one batch, we needn't
1017          * even rewind the cursor, just rescan what we have.
1018          */
1019         if (node->ss.ps.chgParam != NULL)
1020         {
1021                 fsstate->cursor_exists = false;
1022                 snprintf(sql, sizeof(sql), "CLOSE c%u",
1023                                  fsstate->cursor_number);
1024         }
1025         else if (fsstate->fetch_ct_2 > 1)
1026         {
1027                 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1028                                  fsstate->cursor_number);
1029         }
1030         else
1031         {
1032                 /* Easy: just rescan what we already have in memory, if anything */
1033                 fsstate->next_tuple = 0;
1034                 return;
1035         }
1036
1037         /*
1038          * We don't use a PG_TRY block here, so be careful not to throw error
1039          * without releasing the PGresult.
1040          */
1041         res = PQexec(fsstate->conn, sql);
1042         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1043                 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1044         PQclear(res);
1045
1046         /* Now force a fresh FETCH. */
1047         fsstate->tuples = NULL;
1048         fsstate->num_tuples = 0;
1049         fsstate->next_tuple = 0;
1050         fsstate->fetch_ct_2 = 0;
1051         fsstate->eof_reached = false;
1052 }
1053
1054 /*
1055  * postgresEndForeignScan
1056  *              Finish scanning foreign table and dispose objects used for this scan
1057  */
1058 static void
1059 postgresEndForeignScan(ForeignScanState *node)
1060 {
1061         PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1062
1063         /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1064         if (fsstate == NULL)
1065                 return;
1066
1067         /* Close the cursor if open, to prevent accumulation of cursors */
1068         if (fsstate->cursor_exists)
1069                 close_cursor(fsstate->conn, fsstate->cursor_number);
1070
1071         /* Release remote connection */
1072         ReleaseConnection(fsstate->conn);
1073         fsstate->conn = NULL;
1074
1075         /* MemoryContexts will be deleted automatically. */
1076 }
1077
1078 /*
1079  * postgresAddForeignUpdateTargets
1080  *              Add resjunk column(s) needed for update/delete on a foreign table
1081  */
1082 static void
1083 postgresAddForeignUpdateTargets(Query *parsetree,
1084                                                                 RangeTblEntry *target_rte,
1085                                                                 Relation target_relation)
1086 {
1087         Var                *var;
1088         const char *attrname;
1089         TargetEntry *tle;
1090
1091         /*
1092          * In postgres_fdw, what we need is the ctid, same as for a regular table.
1093          */
1094
1095         /* Make a Var representing the desired value */
1096         var = makeVar(parsetree->resultRelation,
1097                                   SelfItemPointerAttributeNumber,
1098                                   TIDOID,
1099                                   -1,
1100                                   InvalidOid,
1101                                   0);
1102
1103         /* Wrap it in a resjunk TLE with the right name ... */
1104         attrname = "ctid";
1105
1106         tle = makeTargetEntry((Expr *) var,
1107                                                   list_length(parsetree->targetList) + 1,
1108                                                   pstrdup(attrname),
1109                                                   true);
1110
1111         /* ... and add it to the query's targetlist */
1112         parsetree->targetList = lappend(parsetree->targetList, tle);
1113 }
1114
1115 /*
1116  * postgresPlanForeignModify
1117  *              Plan an insert/update/delete operation on a foreign table
1118  *
1119  * Note: currently, the plan tree generated for UPDATE/DELETE will always
1120  * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
1121  * and then the ModifyTable node will have to execute individual remote
1122  * UPDATE/DELETE commands.      If there are no local conditions or joins
1123  * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
1124  * and then do nothing at ModifyTable.  Room for future optimization ...
1125  */
1126 static List *
1127 postgresPlanForeignModify(PlannerInfo *root,
1128                                                   ModifyTable *plan,
1129                                                   Index resultRelation,
1130                                                   int subplan_index)
1131 {
1132         CmdType         operation = plan->operation;
1133         RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1134         Relation        rel;
1135         StringInfoData sql;
1136         List       *targetAttrs = NIL;
1137         List       *returningList = NIL;
1138         List       *retrieved_attrs = NIL;
1139
1140         initStringInfo(&sql);
1141
1142         /*
1143          * Core code already has some lock on each rel being planned, so we can
1144          * use NoLock here.
1145          */
1146         rel = heap_open(rte->relid, NoLock);
1147
1148         /*
1149          * In an INSERT, we transmit all columns that are defined in the foreign
1150          * table.  In an UPDATE, we transmit only columns that were explicitly
1151          * targets of the UPDATE, so as to avoid unnecessary data transmission.
1152          * (We can't do that for INSERT since we would miss sending default values
1153          * for columns not listed in the source statement.)
1154          */
1155         if (operation == CMD_INSERT)
1156         {
1157                 TupleDesc       tupdesc = RelationGetDescr(rel);
1158                 int                     attnum;
1159
1160                 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1161                 {
1162                         Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1163
1164                         if (!attr->attisdropped)
1165                                 targetAttrs = lappend_int(targetAttrs, attnum);
1166                 }
1167         }
1168         else if (operation == CMD_UPDATE)
1169         {
1170                 Bitmapset  *tmpset = bms_copy(rte->modifiedCols);
1171                 AttrNumber      col;
1172
1173                 while ((col = bms_first_member(tmpset)) >= 0)
1174                 {
1175                         col += FirstLowInvalidHeapAttributeNumber;
1176                         if (col <= InvalidAttrNumber)           /* shouldn't happen */
1177                                 elog(ERROR, "system-column update is not supported");
1178                         targetAttrs = lappend_int(targetAttrs, col);
1179                 }
1180         }
1181
1182         /*
1183          * Extract the relevant RETURNING list if any.
1184          */
1185         if (plan->returningLists)
1186                 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1187
1188         /*
1189          * Construct the SQL command string.
1190          */
1191         switch (operation)
1192         {
1193                 case CMD_INSERT:
1194                         deparseInsertSql(&sql, root, resultRelation, rel,
1195                                                          targetAttrs, returningList,
1196                                                          &retrieved_attrs);
1197                         break;
1198                 case CMD_UPDATE:
1199                         deparseUpdateSql(&sql, root, resultRelation, rel,
1200                                                          targetAttrs, returningList,
1201                                                          &retrieved_attrs);
1202                         break;
1203                 case CMD_DELETE:
1204                         deparseDeleteSql(&sql, root, resultRelation, rel,
1205                                                          returningList,
1206                                                          &retrieved_attrs);
1207                         break;
1208                 default:
1209                         elog(ERROR, "unexpected operation: %d", (int) operation);
1210                         break;
1211         }
1212
1213         heap_close(rel, NoLock);
1214
1215         /*
1216          * Build the fdw_private list that will be available to the executor.
1217          * Items in the list must match enum FdwModifyPrivateIndex, above.
1218          */
1219         return list_make4(makeString(sql.data),
1220                                           targetAttrs,
1221                                           makeInteger((returningList != NIL)),
1222                                           retrieved_attrs);
1223 }
1224
1225 /*
1226  * postgresBeginForeignModify
1227  *              Begin an insert/update/delete operation on a foreign table
1228  */
1229 static void
1230 postgresBeginForeignModify(ModifyTableState *mtstate,
1231                                                    ResultRelInfo *resultRelInfo,
1232                                                    List *fdw_private,
1233                                                    int subplan_index,
1234                                                    int eflags)
1235 {
1236         PgFdwModifyState *fmstate;
1237         EState     *estate = mtstate->ps.state;
1238         CmdType         operation = mtstate->operation;
1239         Relation        rel = resultRelInfo->ri_RelationDesc;
1240         RangeTblEntry *rte;
1241         Oid                     userid;
1242         ForeignTable *table;
1243         ForeignServer *server;
1244         UserMapping *user;
1245         AttrNumber      n_params;
1246         Oid                     typefnoid;
1247         bool            isvarlena;
1248         ListCell   *lc;
1249
1250         /*
1251          * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
1252          * stays NULL.
1253          */
1254         if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1255                 return;
1256
1257         /* Begin constructing PgFdwModifyState. */
1258         fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1259         fmstate->rel = rel;
1260
1261         /*
1262          * Identify which user to do the remote access as.      This should match what
1263          * ExecCheckRTEPerms() does.
1264          */
1265         rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1266         userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1267
1268         /* Get info about foreign table. */
1269         table = GetForeignTable(RelationGetRelid(rel));
1270         server = GetForeignServer(table->serverid);
1271         user = GetUserMapping(userid, server->serverid);
1272
1273         /* Open connection; report that we'll create a prepared statement. */
1274         fmstate->conn = GetConnection(server, user, true);
1275         fmstate->p_name = NULL;         /* prepared statement not made yet */
1276
1277         /* Deconstruct fdw_private data. */
1278         fmstate->query = strVal(list_nth(fdw_private,
1279                                                                          FdwModifyPrivateUpdateSql));
1280         fmstate->target_attrs = (List *) list_nth(fdw_private,
1281                                                                                           FdwModifyPrivateTargetAttnums);
1282         fmstate->has_returning = intVal(list_nth(fdw_private,
1283                                                                                          FdwModifyPrivateHasReturning));
1284         fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1285                                                                                          FdwModifyPrivateRetrievedAttrs);
1286
1287         /* Create context for per-tuple temp workspace. */
1288         fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1289                                                                                           "postgres_fdw temporary data",
1290                                                                                           ALLOCSET_SMALL_MINSIZE,
1291                                                                                           ALLOCSET_SMALL_INITSIZE,
1292                                                                                           ALLOCSET_SMALL_MAXSIZE);
1293
1294         /* Prepare for input conversion of RETURNING results. */
1295         if (fmstate->has_returning)
1296                 fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
1297
1298         /* Prepare for output conversion of parameters used in prepared stmt. */
1299         n_params = list_length(fmstate->target_attrs) + 1;
1300         fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1301         fmstate->p_nums = 0;
1302
1303         if (operation == CMD_UPDATE || operation == CMD_DELETE)
1304         {
1305                 /* Find the ctid resjunk column in the subplan's result */
1306                 Plan       *subplan = mtstate->mt_plans[subplan_index]->plan;
1307
1308                 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1309                                                                                                                   "ctid");
1310                 if (!AttributeNumberIsValid(fmstate->ctidAttno))
1311                         elog(ERROR, "could not find junk ctid column");
1312
1313                 /* First transmittable parameter will be ctid */
1314                 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1315                 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1316                 fmstate->p_nums++;
1317         }
1318
1319         if (operation == CMD_INSERT || operation == CMD_UPDATE)
1320         {
1321                 /* Set up for remaining transmittable parameters */
1322                 foreach(lc, fmstate->target_attrs)
1323                 {
1324                         int                     attnum = lfirst_int(lc);
1325                         Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1326
1327                         Assert(!attr->attisdropped);
1328
1329                         getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1330                         fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1331                         fmstate->p_nums++;
1332                 }
1333         }
1334
1335         Assert(fmstate->p_nums <= n_params);
1336
1337         resultRelInfo->ri_FdwState = fmstate;
1338 }
1339
1340 /*
1341  * postgresExecForeignInsert
1342  *              Insert one row into a foreign table
1343  */
1344 static TupleTableSlot *
1345 postgresExecForeignInsert(EState *estate,
1346                                                   ResultRelInfo *resultRelInfo,
1347                                                   TupleTableSlot *slot,
1348                                                   TupleTableSlot *planSlot)
1349 {
1350         PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1351         const char **p_values;
1352         PGresult   *res;
1353         int                     n_rows;
1354
1355         /* Set up the prepared statement on the remote server, if we didn't yet */
1356         if (!fmstate->p_name)
1357                 prepare_foreign_modify(fmstate);
1358
1359         /* Convert parameters needed by prepared statement to text form */
1360         p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1361
1362         /*
1363          * Execute the prepared statement, and check for success.
1364          *
1365          * We don't use a PG_TRY block here, so be careful not to throw error
1366          * without releasing the PGresult.
1367          */
1368         res = PQexecPrepared(fmstate->conn,
1369                                                  fmstate->p_name,
1370                                                  fmstate->p_nums,
1371                                                  p_values,
1372                                                  NULL,
1373                                                  NULL,
1374                                                  0);
1375         if (PQresultStatus(res) !=
1376                 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1377                 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1378
1379         /* Check number of rows affected, and fetch RETURNING tuple if any */
1380         if (fmstate->has_returning)
1381         {
1382                 n_rows = PQntuples(res);
1383                 if (n_rows > 0)
1384                         store_returning_result(fmstate, slot, res);
1385         }
1386         else
1387                 n_rows = atoi(PQcmdTuples(res));
1388
1389         /* And clean up */
1390         PQclear(res);
1391
1392         MemoryContextReset(fmstate->temp_cxt);
1393
1394         /* Return NULL if nothing was inserted on the remote end */
1395         return (n_rows > 0) ? slot : NULL;
1396 }
1397
1398 /*
1399  * postgresExecForeignUpdate
1400  *              Update one row in a foreign table
1401  */
1402 static TupleTableSlot *
1403 postgresExecForeignUpdate(EState *estate,
1404                                                   ResultRelInfo *resultRelInfo,
1405                                                   TupleTableSlot *slot,
1406                                                   TupleTableSlot *planSlot)
1407 {
1408         PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1409         Datum           datum;
1410         bool            isNull;
1411         const char **p_values;
1412         PGresult   *res;
1413         int                     n_rows;
1414
1415         /* Set up the prepared statement on the remote server, if we didn't yet */
1416         if (!fmstate->p_name)
1417                 prepare_foreign_modify(fmstate);
1418
1419         /* Get the ctid that was passed up as a resjunk column */
1420         datum = ExecGetJunkAttribute(planSlot,
1421                                                                  fmstate->ctidAttno,
1422                                                                  &isNull);
1423         /* shouldn't ever get a null result... */
1424         if (isNull)
1425                 elog(ERROR, "ctid is NULL");
1426
1427         /* Convert parameters needed by prepared statement to text form */
1428         p_values = convert_prep_stmt_params(fmstate,
1429                                                                                 (ItemPointer) DatumGetPointer(datum),
1430                                                                                 slot);
1431
1432         /*
1433          * Execute the prepared statement, and check for success.
1434          *
1435          * We don't use a PG_TRY block here, so be careful not to throw error
1436          * without releasing the PGresult.
1437          */
1438         res = PQexecPrepared(fmstate->conn,
1439                                                  fmstate->p_name,
1440                                                  fmstate->p_nums,
1441                                                  p_values,
1442                                                  NULL,
1443                                                  NULL,
1444                                                  0);
1445         if (PQresultStatus(res) !=
1446                 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1447                 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1448
1449         /* Check number of rows affected, and fetch RETURNING tuple if any */
1450         if (fmstate->has_returning)
1451         {
1452                 n_rows = PQntuples(res);
1453                 if (n_rows > 0)
1454                         store_returning_result(fmstate, slot, res);
1455         }
1456         else
1457                 n_rows = atoi(PQcmdTuples(res));
1458
1459         /* And clean up */
1460         PQclear(res);
1461
1462         MemoryContextReset(fmstate->temp_cxt);
1463
1464         /* Return NULL if nothing was updated on the remote end */
1465         return (n_rows > 0) ? slot : NULL;
1466 }
1467
1468 /*
1469  * postgresExecForeignDelete
1470  *              Delete one row from a foreign table
1471  */
1472 static TupleTableSlot *
1473 postgresExecForeignDelete(EState *estate,
1474                                                   ResultRelInfo *resultRelInfo,
1475                                                   TupleTableSlot *slot,
1476                                                   TupleTableSlot *planSlot)
1477 {
1478         PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1479         Datum           datum;
1480         bool            isNull;
1481         const char **p_values;
1482         PGresult   *res;
1483         int                     n_rows;
1484
1485         /* Set up the prepared statement on the remote server, if we didn't yet */
1486         if (!fmstate->p_name)
1487                 prepare_foreign_modify(fmstate);
1488
1489         /* Get the ctid that was passed up as a resjunk column */
1490         datum = ExecGetJunkAttribute(planSlot,
1491                                                                  fmstate->ctidAttno,
1492                                                                  &isNull);
1493         /* shouldn't ever get a null result... */
1494         if (isNull)
1495                 elog(ERROR, "ctid is NULL");
1496
1497         /* Convert parameters needed by prepared statement to text form */
1498         p_values = convert_prep_stmt_params(fmstate,
1499                                                                                 (ItemPointer) DatumGetPointer(datum),
1500                                                                                 NULL);
1501
1502         /*
1503          * Execute the prepared statement, and check for success.
1504          *
1505          * We don't use a PG_TRY block here, so be careful not to throw error
1506          * without releasing the PGresult.
1507          */
1508         res = PQexecPrepared(fmstate->conn,
1509                                                  fmstate->p_name,
1510                                                  fmstate->p_nums,
1511                                                  p_values,
1512                                                  NULL,
1513                                                  NULL,
1514                                                  0);
1515         if (PQresultStatus(res) !=
1516                 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
1517                 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1518
1519         /* Check number of rows affected, and fetch RETURNING tuple if any */
1520         if (fmstate->has_returning)
1521         {
1522                 n_rows = PQntuples(res);
1523                 if (n_rows > 0)
1524                         store_returning_result(fmstate, slot, res);
1525         }
1526         else
1527                 n_rows = atoi(PQcmdTuples(res));
1528
1529         /* And clean up */
1530         PQclear(res);
1531
1532         MemoryContextReset(fmstate->temp_cxt);
1533
1534         /* Return NULL if nothing was deleted on the remote end */
1535         return (n_rows > 0) ? slot : NULL;
1536 }
1537
1538 /*
1539  * postgresEndForeignModify
1540  *              Finish an insert/update/delete operation on a foreign table
1541  */
1542 static void
1543 postgresEndForeignModify(EState *estate,
1544                                                  ResultRelInfo *resultRelInfo)
1545 {
1546         PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1547
1548         /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1549         if (fmstate == NULL)
1550                 return;
1551
1552         /* If we created a prepared statement, destroy it */
1553         if (fmstate->p_name)
1554         {
1555                 char            sql[64];
1556                 PGresult   *res;
1557
1558                 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
1559
1560                 /*
1561                  * We don't use a PG_TRY block here, so be careful not to throw error
1562                  * without releasing the PGresult.
1563                  */
1564                 res = PQexec(fmstate->conn, sql);
1565                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1566                         pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
1567                 PQclear(res);
1568                 fmstate->p_name = NULL;
1569         }
1570
1571         /* Release remote connection */
1572         ReleaseConnection(fmstate->conn);
1573         fmstate->conn = NULL;
1574 }
1575
1576 /*
1577  * postgresIsForeignRelUpdatable
1578  *              Determine whether a foreign table supports INSERT, UPDATE and/or
1579  *              DELETE.
1580  */
1581 static int
1582 postgresIsForeignRelUpdatable(Relation rel)
1583 {
1584         bool            updatable;
1585         ForeignTable *table;
1586         ForeignServer *server;
1587         ListCell   *lc;
1588
1589         /*
1590          * By default, all postgres_fdw foreign tables are assumed updatable. This
1591          * can be overridden by a per-server setting, which in turn can be
1592          * overridden by a per-table setting.
1593          */
1594         updatable = true;
1595
1596         table = GetForeignTable(RelationGetRelid(rel));
1597         server = GetForeignServer(table->serverid);
1598
1599         foreach(lc, server->options)
1600         {
1601                 DefElem    *def = (DefElem *) lfirst(lc);
1602
1603                 if (strcmp(def->defname, "updatable") == 0)
1604                         updatable = defGetBoolean(def);
1605         }
1606         foreach(lc, table->options)
1607         {
1608                 DefElem    *def = (DefElem *) lfirst(lc);
1609
1610                 if (strcmp(def->defname, "updatable") == 0)
1611                         updatable = defGetBoolean(def);
1612         }
1613
1614         /*
1615          * Currently "updatable" means support for INSERT, UPDATE and DELETE.
1616          */
1617         return updatable ?
1618                 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
1619 }
1620
1621 /*
1622  * postgresExplainForeignScan
1623  *              Produce extra output for EXPLAIN of a ForeignScan on a foreign table
1624  */
1625 static void
1626 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
1627 {
1628         List       *fdw_private;
1629         char       *sql;
1630
1631         if (es->verbose)
1632         {
1633                 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
1634                 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
1635                 ExplainPropertyText("Remote SQL", sql, es);
1636         }
1637 }
1638
1639 /*
1640  * postgresExplainForeignModify
1641  *              Produce extra output for EXPLAIN of a ModifyTable on a foreign table
1642  */
1643 static void
1644 postgresExplainForeignModify(ModifyTableState *mtstate,
1645                                                          ResultRelInfo *rinfo,
1646                                                          List *fdw_private,
1647                                                          int subplan_index,
1648                                                          ExplainState *es)
1649 {
1650         if (es->verbose)
1651         {
1652                 char       *sql = strVal(list_nth(fdw_private,
1653                                                                                   FdwModifyPrivateUpdateSql));
1654
1655                 ExplainPropertyText("Remote SQL", sql, es);
1656         }
1657 }
1658
1659
1660 /*
1661  * estimate_path_cost_size
1662  *              Get cost and size estimates for a foreign scan
1663  *
1664  * We assume that all the baserestrictinfo clauses will be applied, plus
1665  * any join clauses listed in join_conds.
1666  */
1667 static void
1668 estimate_path_cost_size(PlannerInfo *root,
1669                                                 RelOptInfo *baserel,
1670                                                 List *join_conds,
1671                                                 double *p_rows, int *p_width,
1672                                                 Cost *p_startup_cost, Cost *p_total_cost)
1673 {
1674         PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1675         double          rows;
1676         double          retrieved_rows;
1677         int                     width;
1678         Cost            startup_cost;
1679         Cost            total_cost;
1680         Cost            run_cost;
1681         Cost            cpu_per_tuple;
1682
1683         /*
1684          * If the table or the server is configured to use remote estimates,
1685          * connect to the foreign server and execute EXPLAIN to estimate the
1686          * number of rows selected by the restriction+join clauses.  Otherwise,
1687          * estimate rows using whatever statistics we have locally, in a way
1688          * similar to ordinary tables.
1689          */
1690         if (fpinfo->use_remote_estimate)
1691         {
1692                 StringInfoData sql;
1693                 List       *retrieved_attrs;
1694                 PGconn     *conn;
1695
1696                 /*
1697                  * Construct EXPLAIN query including the desired SELECT, FROM, and
1698                  * WHERE clauses.  Params and other-relation Vars are replaced by
1699                  * dummy values.
1700                  */
1701                 initStringInfo(&sql);
1702                 appendStringInfoString(&sql, "EXPLAIN ");
1703                 deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
1704                                                  &retrieved_attrs);
1705                 if (fpinfo->remote_conds)
1706                         appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
1707                                                           true, NULL);
1708                 if (join_conds)
1709                         appendWhereClause(&sql, root, baserel, join_conds,
1710                                                           (fpinfo->remote_conds == NIL), NULL);
1711
1712                 /* Get the remote estimate */
1713                 conn = GetConnection(fpinfo->server, fpinfo->user, false);
1714                 get_remote_estimate(sql.data, conn, &rows, &width,
1715                                                         &startup_cost, &total_cost);
1716                 ReleaseConnection(conn);
1717
1718                 retrieved_rows = rows;
1719
1720                 /* Factor in the selectivity of the local_conds */
1721                 rows = clamp_row_est(rows * fpinfo->local_conds_sel);
1722
1723                 /* Add in the eval cost of the local_conds */
1724                 startup_cost += fpinfo->local_conds_cost.startup;
1725                 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
1726         }
1727         else
1728         {
1729                 /*
1730                  * We don't support join conditions in this mode (hence, no
1731                  * parameterized paths can be made).
1732                  */
1733                 Assert(join_conds == NIL);
1734
1735                 /* Use rows/width estimates made by set_baserel_size_estimates. */
1736                 rows = baserel->rows;
1737                 width = baserel->width;
1738
1739                 /*
1740                  * Back into an estimate of the number of retrieved rows.  Just in
1741                  * case this is nuts, clamp to at most baserel->tuples.
1742                  */
1743                 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
1744                 retrieved_rows = Min(retrieved_rows, baserel->tuples);
1745
1746                 /*
1747                  * Cost as though this were a seqscan, which is pessimistic.  We
1748                  * effectively imagine the local_conds are being evaluated remotely,
1749                  * too.
1750                  */
1751                 startup_cost = 0;
1752                 run_cost = 0;
1753                 run_cost += seq_page_cost * baserel->pages;
1754
1755                 startup_cost += baserel->baserestrictcost.startup;
1756                 cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
1757                 run_cost += cpu_per_tuple * baserel->tuples;
1758
1759                 total_cost = startup_cost + run_cost;
1760         }
1761
1762         /*
1763          * Add some additional cost factors to account for connection overhead
1764          * (fdw_startup_cost), transferring data across the network
1765          * (fdw_tuple_cost per retrieved row), and local manipulation of the data
1766          * (cpu_tuple_cost per retrieved row).
1767          */
1768         startup_cost += fpinfo->fdw_startup_cost;
1769         total_cost += fpinfo->fdw_startup_cost;
1770         total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
1771         total_cost += cpu_tuple_cost * retrieved_rows;
1772
1773         /* Return results. */
1774         *p_rows = rows;
1775         *p_width = width;
1776         *p_startup_cost = startup_cost;
1777         *p_total_cost = total_cost;
1778 }
1779
1780 /*
1781  * Estimate costs of executing a SQL statement remotely.
1782  * The given "sql" must be an EXPLAIN command.
1783  */
1784 static void
1785 get_remote_estimate(const char *sql, PGconn *conn,
1786                                         double *rows, int *width,
1787                                         Cost *startup_cost, Cost *total_cost)
1788 {
1789         PGresult   *volatile res = NULL;
1790
1791         /* PGresult must be released before leaving this function. */
1792         PG_TRY();
1793         {
1794                 char       *line;
1795                 char       *p;
1796                 int                     n;
1797
1798                 /*
1799                  * Execute EXPLAIN remotely.
1800                  */
1801                 res = PQexec(conn, sql);
1802                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1803                         pgfdw_report_error(ERROR, res, conn, false, sql);
1804
1805                 /*
1806                  * Extract cost numbers for topmost plan node.  Note we search for a
1807                  * left paren from the end of the line to avoid being confused by
1808                  * other uses of parentheses.
1809                  */
1810                 line = PQgetvalue(res, 0, 0);
1811                 p = strrchr(line, '(');
1812                 if (p == NULL)
1813                         elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
1814                 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
1815                                    startup_cost, total_cost, rows, width);
1816                 if (n != 4)
1817                         elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
1818
1819                 PQclear(res);
1820                 res = NULL;
1821         }
1822         PG_CATCH();
1823         {
1824                 if (res)
1825                         PQclear(res);
1826                 PG_RE_THROW();
1827         }
1828         PG_END_TRY();
1829 }
1830
1831 /*
1832  * Detect whether we want to process an EquivalenceClass member.
1833  *
1834  * This is a callback for use by generate_implied_equalities_for_column.
1835  */
1836 static bool
1837 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
1838                                                   EquivalenceClass *ec, EquivalenceMember *em,
1839                                                   void *arg)
1840 {
1841         ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
1842         Expr       *expr = em->em_expr;
1843
1844         /*
1845          * If we've identified what we're processing in the current scan, we only
1846          * want to match that expression.
1847          */
1848         if (state->current != NULL)
1849                 return equal(expr, state->current);
1850
1851         /*
1852          * Otherwise, ignore anything we've already processed.
1853          */
1854         if (list_member(state->already_used, expr))
1855                 return false;
1856
1857         /* This is the new target to process. */
1858         state->current = expr;
1859         return true;
1860 }
1861
1862 /*
1863  * Create cursor for node's query with current parameter values.
1864  */
1865 static void
1866 create_cursor(ForeignScanState *node)
1867 {
1868         PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1869         ExprContext *econtext = node->ss.ps.ps_ExprContext;
1870         int                     numParams = fsstate->numParams;
1871         const char **values = fsstate->param_values;
1872         PGconn     *conn = fsstate->conn;
1873         StringInfoData buf;
1874         PGresult   *res;
1875
1876         /*
1877          * Construct array of query parameter values in text format.  We do the
1878          * conversions in the short-lived per-tuple context, so as not to cause a
1879          * memory leak over repeated scans.
1880          */
1881         if (numParams > 0)
1882         {
1883                 int                     nestlevel;
1884                 MemoryContext oldcontext;
1885                 int                     i;
1886                 ListCell   *lc;
1887
1888                 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1889
1890                 nestlevel = set_transmission_modes();
1891
1892                 i = 0;
1893                 foreach(lc, fsstate->param_exprs)
1894                 {
1895                         ExprState  *expr_state = (ExprState *) lfirst(lc);
1896                         Datum           expr_value;
1897                         bool            isNull;
1898
1899                         /* Evaluate the parameter expression */
1900                         expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
1901
1902                         /*
1903                          * Get string representation of each parameter value by invoking
1904                          * type-specific output function, unless the value is null.
1905                          */
1906                         if (isNull)
1907                                 values[i] = NULL;
1908                         else
1909                                 values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
1910                                                                                            expr_value);
1911                         i++;
1912                 }
1913
1914                 reset_transmission_modes(nestlevel);
1915
1916                 MemoryContextSwitchTo(oldcontext);
1917         }
1918
1919         /* Construct the DECLARE CURSOR command */
1920         initStringInfo(&buf);
1921         appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
1922                                          fsstate->cursor_number, fsstate->query);
1923
1924         /*
1925          * Notice that we pass NULL for paramTypes, thus forcing the remote server
1926          * to infer types for all parameters.  Since we explicitly cast every
1927          * parameter (see deparse.c), the "inference" is trivial and will produce
1928          * the desired result.  This allows us to avoid assuming that the remote
1929          * server has the same OIDs we do for the parameters' types.
1930          *
1931          * We don't use a PG_TRY block here, so be careful not to throw error
1932          * without releasing the PGresult.
1933          */
1934         res = PQexecParams(conn, buf.data, numParams, NULL, values,
1935                                            NULL, NULL, 0);
1936         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1937                 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
1938         PQclear(res);
1939
1940         /* Mark the cursor as created, and show no tuples have been retrieved */
1941         fsstate->cursor_exists = true;
1942         fsstate->tuples = NULL;
1943         fsstate->num_tuples = 0;
1944         fsstate->next_tuple = 0;
1945         fsstate->fetch_ct_2 = 0;
1946         fsstate->eof_reached = false;
1947
1948         /* Clean up */
1949         pfree(buf.data);
1950 }
1951
1952 /*
1953  * Fetch some more rows from the node's cursor.
1954  */
1955 static void
1956 fetch_more_data(ForeignScanState *node)
1957 {
1958         PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1959         PGresult   *volatile res = NULL;
1960         MemoryContext oldcontext;
1961
1962         /*
1963          * We'll store the tuples in the batch_cxt.  First, flush the previous
1964          * batch.
1965          */
1966         fsstate->tuples = NULL;
1967         MemoryContextReset(fsstate->batch_cxt);
1968         oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
1969
1970         /* PGresult must be released before leaving this function. */
1971         PG_TRY();
1972         {
1973                 PGconn     *conn = fsstate->conn;
1974                 char            sql[64];
1975                 int                     fetch_size;
1976                 int                     numrows;
1977                 int                     i;
1978
1979                 /* The fetch size is arbitrary, but shouldn't be enormous. */
1980                 fetch_size = 100;
1981
1982                 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
1983                                  fetch_size, fsstate->cursor_number);
1984
1985                 res = PQexec(conn, sql);
1986                 /* On error, report the original query, not the FETCH. */
1987                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1988                         pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
1989
1990                 /* Convert the data into HeapTuples */
1991                 numrows = PQntuples(res);
1992                 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
1993                 fsstate->num_tuples = numrows;
1994                 fsstate->next_tuple = 0;
1995
1996                 for (i = 0; i < numrows; i++)
1997                 {
1998                         fsstate->tuples[i] =
1999                                 make_tuple_from_result_row(res, i,
2000                                                                                    fsstate->rel,
2001                                                                                    fsstate->attinmeta,
2002                                                                                    fsstate->retrieved_attrs,
2003                                                                                    fsstate->temp_cxt);
2004                 }
2005
2006                 /* Update fetch_ct_2 */
2007                 if (fsstate->fetch_ct_2 < 2)
2008                         fsstate->fetch_ct_2++;
2009
2010                 /* Must be EOF if we didn't get as many tuples as we asked for. */
2011                 fsstate->eof_reached = (numrows < fetch_size);
2012
2013                 PQclear(res);
2014                 res = NULL;
2015         }
2016         PG_CATCH();
2017         {
2018                 if (res)
2019                         PQclear(res);
2020                 PG_RE_THROW();
2021         }
2022         PG_END_TRY();
2023
2024         MemoryContextSwitchTo(oldcontext);
2025 }
2026
2027 /*
2028  * Force assorted GUC parameters to settings that ensure that we'll output
2029  * data values in a form that is unambiguous to the remote server.
2030  *
2031  * This is rather expensive and annoying to do once per row, but there's
2032  * little choice if we want to be sure values are transmitted accurately;
2033  * we can't leave the settings in place between rows for fear of affecting
2034  * user-visible computations.
2035  *
2036  * We use the equivalent of a function SET option to allow the settings to
2037  * persist only until the caller calls reset_transmission_modes().      If an
2038  * error is thrown in between, guc.c will take care of undoing the settings.
2039  *
2040  * The return value is the nestlevel that must be passed to
2041  * reset_transmission_modes() to undo things.
2042  */
2043 int
2044 set_transmission_modes(void)
2045 {
2046         int                     nestlevel = NewGUCNestLevel();
2047
2048         /*
2049          * The values set here should match what pg_dump does.  See also
2050          * configure_remote_session in connection.c.
2051          */
2052         if (DateStyle != USE_ISO_DATES)
2053                 (void) set_config_option("datestyle", "ISO",
2054                                                                  PGC_USERSET, PGC_S_SESSION,
2055                                                                  GUC_ACTION_SAVE, true, 0);
2056         if (IntervalStyle != INTSTYLE_POSTGRES)
2057                 (void) set_config_option("intervalstyle", "postgres",
2058                                                                  PGC_USERSET, PGC_S_SESSION,
2059                                                                  GUC_ACTION_SAVE, true, 0);
2060         if (extra_float_digits < 3)
2061                 (void) set_config_option("extra_float_digits", "3",
2062                                                                  PGC_USERSET, PGC_S_SESSION,
2063                                                                  GUC_ACTION_SAVE, true, 0);
2064
2065         return nestlevel;
2066 }
2067
2068 /*
2069  * Undo the effects of set_transmission_modes().
2070  */
2071 void
2072 reset_transmission_modes(int nestlevel)
2073 {
2074         AtEOXact_GUC(true, nestlevel);
2075 }
2076
2077 /*
2078  * Utility routine to close a cursor.
2079  */
2080 static void
2081 close_cursor(PGconn *conn, unsigned int cursor_number)
2082 {
2083         char            sql[64];
2084         PGresult   *res;
2085
2086         snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
2087
2088         /*
2089          * We don't use a PG_TRY block here, so be careful not to throw error
2090          * without releasing the PGresult.
2091          */
2092         res = PQexec(conn, sql);
2093         if (PQresultStatus(res) != PGRES_COMMAND_OK)
2094                 pgfdw_report_error(ERROR, res, conn, true, sql);
2095         PQclear(res);
2096 }
2097
2098 /*
2099  * prepare_foreign_modify
2100  *              Establish a prepared statement for execution of INSERT/UPDATE/DELETE
2101  */
2102 static void
2103 prepare_foreign_modify(PgFdwModifyState *fmstate)
2104 {
2105         char            prep_name[NAMEDATALEN];
2106         char       *p_name;
2107         PGresult   *res;
2108
2109         /* Construct name we'll use for the prepared statement. */
2110         snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
2111                          GetPrepStmtNumber(fmstate->conn));
2112         p_name = pstrdup(prep_name);
2113
2114         /*
2115          * We intentionally do not specify parameter types here, but leave the
2116          * remote server to derive them by default.  This avoids possible problems
2117          * with the remote server using different type OIDs than we do.  All of
2118          * the prepared statements we use in this module are simple enough that
2119          * the remote server will make the right choices.
2120          *
2121          * We don't use a PG_TRY block here, so be careful not to throw error
2122          * without releasing the PGresult.
2123          */
2124         res = PQprepare(fmstate->conn,
2125                                         p_name,
2126                                         fmstate->query,
2127                                         0,
2128                                         NULL);
2129
2130         if (PQresultStatus(res) != PGRES_COMMAND_OK)
2131                 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
2132         PQclear(res);
2133
2134         /* This action shows that the prepare has been done. */
2135         fmstate->p_name = p_name;
2136 }
2137
2138 /*
2139  * convert_prep_stmt_params
2140  *              Create array of text strings representing parameter values
2141  *
2142  * tupleid is ctid to send, or NULL if none
2143  * slot is slot to get remaining parameters from, or NULL if none
2144  *
2145  * Data is constructed in temp_cxt; caller should reset that after use.
2146  */
2147 static const char **
2148 convert_prep_stmt_params(PgFdwModifyState *fmstate,
2149                                                  ItemPointer tupleid,
2150                                                  TupleTableSlot *slot)
2151 {
2152         const char **p_values;
2153         int                     pindex = 0;
2154         MemoryContext oldcontext;
2155
2156         oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
2157
2158         p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
2159
2160         /* 1st parameter should be ctid, if it's in use */
2161         if (tupleid != NULL)
2162         {
2163                 /* don't need set_transmission_modes for TID output */
2164                 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
2165                                                                                           PointerGetDatum(tupleid));
2166                 pindex++;
2167         }
2168
2169         /* get following parameters from slot */
2170         if (slot != NULL && fmstate->target_attrs != NIL)
2171         {
2172                 int                     nestlevel;
2173                 ListCell   *lc;
2174
2175                 nestlevel = set_transmission_modes();
2176
2177                 foreach(lc, fmstate->target_attrs)
2178                 {
2179                         int                     attnum = lfirst_int(lc);
2180                         Datum           value;
2181                         bool            isnull;
2182
2183                         value = slot_getattr(slot, attnum, &isnull);
2184                         if (isnull)
2185                                 p_values[pindex] = NULL;
2186                         else
2187                                 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
2188                                                                                                           value);
2189                         pindex++;
2190                 }
2191
2192                 reset_transmission_modes(nestlevel);
2193         }
2194
2195         Assert(pindex == fmstate->p_nums);
2196
2197         MemoryContextSwitchTo(oldcontext);
2198
2199         return p_values;
2200 }
2201
2202 /*
2203  * store_returning_result
2204  *              Store the result of a RETURNING clause
2205  *
2206  * On error, be sure to release the PGresult on the way out.  Callers do not
2207  * have PG_TRY blocks to ensure this happens.
2208  */
2209 static void
2210 store_returning_result(PgFdwModifyState *fmstate,
2211                                            TupleTableSlot *slot, PGresult *res)
2212 {
2213         /* PGresult must be released before leaving this function. */
2214         PG_TRY();
2215         {
2216                 HeapTuple       newtup;
2217
2218                 newtup = make_tuple_from_result_row(res, 0,
2219                                                                                         fmstate->rel,
2220                                                                                         fmstate->attinmeta,
2221                                                                                         fmstate->retrieved_attrs,
2222                                                                                         fmstate->temp_cxt);
2223                 /* tuple will be deleted when it is cleared from the slot */
2224                 ExecStoreTuple(newtup, slot, InvalidBuffer, true);
2225         }
2226         PG_CATCH();
2227         {
2228                 if (res)
2229                         PQclear(res);
2230                 PG_RE_THROW();
2231         }
2232         PG_END_TRY();
2233 }
2234
2235 /*
2236  * postgresAnalyzeForeignTable
2237  *              Test whether analyzing this foreign table is supported
2238  */
2239 static bool
2240 postgresAnalyzeForeignTable(Relation relation,
2241                                                         AcquireSampleRowsFunc *func,
2242                                                         BlockNumber *totalpages)
2243 {
2244         ForeignTable *table;
2245         ForeignServer *server;
2246         UserMapping *user;
2247         PGconn     *conn;
2248         StringInfoData sql;
2249         PGresult   *volatile res = NULL;
2250
2251         /* Return the row-analysis function pointer */
2252         *func = postgresAcquireSampleRowsFunc;
2253
2254         /*
2255          * Now we have to get the number of pages.      It's annoying that the ANALYZE
2256          * API requires us to return that now, because it forces some duplication
2257          * of effort between this routine and postgresAcquireSampleRowsFunc.  But
2258          * it's probably not worth redefining that API at this point.
2259          */
2260
2261         /*
2262          * Get the connection to use.  We do the remote access as the table's
2263          * owner, even if the ANALYZE was started by some other user.
2264          */
2265         table = GetForeignTable(RelationGetRelid(relation));
2266         server = GetForeignServer(table->serverid);
2267         user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
2268         conn = GetConnection(server, user, false);
2269
2270         /*
2271          * Construct command to get page count for relation.
2272          */
2273         initStringInfo(&sql);
2274         deparseAnalyzeSizeSql(&sql, relation);
2275
2276         /* In what follows, do not risk leaking any PGresults. */
2277         PG_TRY();
2278         {
2279                 res = PQexec(conn, sql.data);
2280                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2281                         pgfdw_report_error(ERROR, res, conn, false, sql.data);
2282
2283                 if (PQntuples(res) != 1 || PQnfields(res) != 1)
2284                         elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
2285                 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2286
2287                 PQclear(res);
2288                 res = NULL;
2289         }
2290         PG_CATCH();
2291         {
2292                 if (res)
2293                         PQclear(res);
2294                 PG_RE_THROW();
2295         }
2296         PG_END_TRY();
2297
2298         ReleaseConnection(conn);
2299
2300         return true;
2301 }
2302
2303 /*
2304  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
2305  *
2306  * We fetch the whole table from the remote side and pick out some sample rows.
2307  *
2308  * Selected rows are returned in the caller-allocated array rows[],
2309  * which must have at least targrows entries.
2310  * The actual number of rows selected is returned as the function result.
2311  * We also count the total number of rows in the table and return it into
2312  * *totalrows.  Note that *totaldeadrows is always set to 0.
2313  *
2314  * Note that the returned list of rows is not always in order by physical
2315  * position in the table.  Therefore, correlation estimates derived later
2316  * may be meaningless, but it's OK because we don't use the estimates
2317  * currently (the planner only pays attention to correlation for indexscans).
2318  */
2319 static int
2320 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
2321                                                           HeapTuple *rows, int targrows,
2322                                                           double *totalrows,
2323                                                           double *totaldeadrows)
2324 {
2325         PgFdwAnalyzeState astate;
2326         ForeignTable *table;
2327         ForeignServer *server;
2328         UserMapping *user;
2329         PGconn     *conn;
2330         unsigned int cursor_number;
2331         StringInfoData sql;
2332         PGresult   *volatile res = NULL;
2333
2334         /* Initialize workspace state */
2335         astate.rel = relation;
2336         astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
2337
2338         astate.rows = rows;
2339         astate.targrows = targrows;
2340         astate.numrows = 0;
2341         astate.samplerows = 0;
2342         astate.rowstoskip = -1;         /* -1 means not set yet */
2343         astate.rstate = anl_init_selection_state(targrows);
2344
2345         /* Remember ANALYZE context, and create a per-tuple temp context */
2346         astate.anl_cxt = CurrentMemoryContext;
2347         astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
2348                                                                                         "postgres_fdw temporary data",
2349                                                                                         ALLOCSET_SMALL_MINSIZE,
2350                                                                                         ALLOCSET_SMALL_INITSIZE,
2351                                                                                         ALLOCSET_SMALL_MAXSIZE);
2352
2353         /*
2354          * Get the connection to use.  We do the remote access as the table's
2355          * owner, even if the ANALYZE was started by some other user.
2356          */
2357         table = GetForeignTable(RelationGetRelid(relation));
2358         server = GetForeignServer(table->serverid);
2359         user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
2360         conn = GetConnection(server, user, false);
2361
2362         /*
2363          * Construct cursor that retrieves whole rows from remote.
2364          */
2365         cursor_number = GetCursorNumber(conn);
2366         initStringInfo(&sql);
2367         appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
2368         deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
2369
2370         /* In what follows, do not risk leaking any PGresults. */
2371         PG_TRY();
2372         {
2373                 res = PQexec(conn, sql.data);
2374                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2375                         pgfdw_report_error(ERROR, res, conn, false, sql.data);
2376                 PQclear(res);
2377                 res = NULL;
2378
2379                 /* Retrieve and process rows a batch at a time. */
2380                 for (;;)
2381                 {
2382                         char            fetch_sql[64];
2383                         int                     fetch_size;
2384                         int                     numrows;
2385                         int                     i;
2386
2387                         /* Allow users to cancel long query */
2388                         CHECK_FOR_INTERRUPTS();
2389
2390                         /*
2391                          * XXX possible future improvement: if rowstoskip is large, we
2392                          * could issue a MOVE rather than physically fetching the rows,
2393                          * then just adjust rowstoskip and samplerows appropriately.
2394                          */
2395
2396                         /* The fetch size is arbitrary, but shouldn't be enormous. */
2397                         fetch_size = 100;
2398
2399                         /* Fetch some rows */
2400                         snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
2401                                          fetch_size, cursor_number);
2402
2403                         res = PQexec(conn, fetch_sql);
2404                         /* On error, report the original query, not the FETCH. */
2405                         if (PQresultStatus(res) != PGRES_TUPLES_OK)
2406                                 pgfdw_report_error(ERROR, res, conn, false, sql.data);
2407
2408                         /* Process whatever we got. */
2409                         numrows = PQntuples(res);
2410                         for (i = 0; i < numrows; i++)
2411                                 analyze_row_processor(res, i, &astate);
2412
2413                         PQclear(res);
2414                         res = NULL;
2415
2416                         /* Must be EOF if we didn't get all the rows requested. */
2417                         if (numrows < fetch_size)
2418                                 break;
2419                 }
2420
2421                 /* Close the cursor, just to be tidy. */
2422                 close_cursor(conn, cursor_number);
2423         }
2424         PG_CATCH();
2425         {
2426                 if (res)
2427                         PQclear(res);
2428                 PG_RE_THROW();
2429         }
2430         PG_END_TRY();
2431
2432         ReleaseConnection(conn);
2433
2434         /* We assume that we have no dead tuple. */
2435         *totaldeadrows = 0.0;
2436
2437         /* We've retrieved all living tuples from foreign server. */
2438         *totalrows = astate.samplerows;
2439
2440         /*
2441          * Emit some interesting relation info
2442          */
2443         ereport(elevel,
2444                         (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
2445                                         RelationGetRelationName(relation),
2446                                         astate.samplerows, astate.numrows)));
2447
2448         return astate.numrows;
2449 }
2450
2451 /*
2452  * Collect sample rows from the result of query.
2453  *       - Use all tuples in sample until target # of samples are collected.
2454  *       - Subsequently, replace already-sampled tuples randomly.
2455  */
2456 static void
2457 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
2458 {
2459         int                     targrows = astate->targrows;
2460         int                     pos;                    /* array index to store tuple in */
2461         MemoryContext oldcontext;
2462
2463         /* Always increment sample row counter. */
2464         astate->samplerows += 1;
2465
2466         /*
2467          * Determine the slot where this sample row should be stored.  Set pos to
2468          * negative value to indicate the row should be skipped.
2469          */
2470         if (astate->numrows < targrows)
2471         {
2472                 /* First targrows rows are always included into the sample */
2473                 pos = astate->numrows++;
2474         }
2475         else
2476         {
2477                 /*
2478                  * Now we start replacing tuples in the sample until we reach the end
2479                  * of the relation.  Same algorithm as in acquire_sample_rows in
2480                  * analyze.c; see Jeff Vitter's paper.
2481                  */
2482                 if (astate->rowstoskip < 0)
2483                         astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
2484                                                                                                 &astate->rstate);
2485
2486                 if (astate->rowstoskip <= 0)
2487                 {
2488                         /* Choose a random reservoir element to replace. */
2489                         pos = (int) (targrows * anl_random_fract());
2490                         Assert(pos >= 0 && pos < targrows);
2491                         heap_freetuple(astate->rows[pos]);
2492                 }
2493                 else
2494                 {
2495                         /* Skip this tuple. */
2496                         pos = -1;
2497                 }
2498
2499                 astate->rowstoskip -= 1;
2500         }
2501
2502         if (pos >= 0)
2503         {
2504                 /*
2505                  * Create sample tuple from current result row, and store it in the
2506                  * position determined above.  The tuple has to be created in anl_cxt.
2507                  */
2508                 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
2509
2510                 astate->rows[pos] = make_tuple_from_result_row(res, row,
2511                                                                                                            astate->rel,
2512                                                                                                            astate->attinmeta,
2513                                                                                                          astate->retrieved_attrs,
2514                                                                                                            astate->temp_cxt);
2515
2516                 MemoryContextSwitchTo(oldcontext);
2517         }
2518 }
2519
2520 /*
2521  * Create a tuple from the specified row of the PGresult.
2522  *
2523  * rel is the local representation of the foreign table, attinmeta is
2524  * conversion data for the rel's tupdesc, and retrieved_attrs is an
2525  * integer list of the table column numbers present in the PGresult.
2526  * temp_context is a working context that can be reset after each tuple.
2527  */
2528 static HeapTuple
2529 make_tuple_from_result_row(PGresult *res,
2530                                                    int row,
2531                                                    Relation rel,
2532                                                    AttInMetadata *attinmeta,
2533                                                    List *retrieved_attrs,
2534                                                    MemoryContext temp_context)
2535 {
2536         HeapTuple       tuple;
2537         TupleDesc       tupdesc = RelationGetDescr(rel);
2538         Datum      *values;
2539         bool       *nulls;
2540         ItemPointer ctid = NULL;
2541         ConversionLocation errpos;
2542         ErrorContextCallback errcallback;
2543         MemoryContext oldcontext;
2544         ListCell   *lc;
2545         int                     j;
2546
2547         Assert(row < PQntuples(res));
2548
2549         /*
2550          * Do the following work in a temp context that we reset after each tuple.
2551          * This cleans up not only the data we have direct access to, but any
2552          * cruft the I/O functions might leak.
2553          */
2554         oldcontext = MemoryContextSwitchTo(temp_context);
2555
2556         values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
2557         nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
2558         /* Initialize to nulls for any columns not present in result */
2559         memset(nulls, true, tupdesc->natts * sizeof(bool));
2560
2561         /*
2562          * Set up and install callback to report where conversion error occurs.
2563          */
2564         errpos.rel = rel;
2565         errpos.cur_attno = 0;
2566         errcallback.callback = conversion_error_callback;
2567         errcallback.arg = (void *) &errpos;
2568         errcallback.previous = error_context_stack;
2569         error_context_stack = &errcallback;
2570
2571         /*
2572          * i indexes columns in the relation, j indexes columns in the PGresult.
2573          */
2574         j = 0;
2575         foreach(lc, retrieved_attrs)
2576         {
2577                 int                     i = lfirst_int(lc);
2578                 char       *valstr;
2579
2580                 /* fetch next column's textual value */
2581                 if (PQgetisnull(res, row, j))
2582                         valstr = NULL;
2583                 else
2584                         valstr = PQgetvalue(res, row, j);
2585
2586                 /* convert value to internal representation */
2587                 if (i > 0)
2588                 {
2589                         /* ordinary column */
2590                         Assert(i <= tupdesc->natts);
2591                         nulls[i - 1] = (valstr == NULL);
2592                         /* Apply the input function even to nulls, to support domains */
2593                         errpos.cur_attno = i;
2594                         values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
2595                                                                                           valstr,
2596                                                                                           attinmeta->attioparams[i - 1],
2597                                                                                           attinmeta->atttypmods[i - 1]);
2598                         errpos.cur_attno = 0;
2599                 }
2600                 else if (i == SelfItemPointerAttributeNumber)
2601                 {
2602                         /* ctid --- note we ignore any other system column in result */
2603                         if (valstr != NULL)
2604                         {
2605                                 Datum           datum;
2606
2607                                 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
2608                                 ctid = (ItemPointer) DatumGetPointer(datum);
2609                         }
2610                 }
2611
2612                 j++;
2613         }
2614
2615         /* Uninstall error context callback. */
2616         error_context_stack = errcallback.previous;
2617
2618         /*
2619          * Check we got the expected number of columns.  Note: j == 0 and
2620          * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
2621          */
2622         if (j > 0 && j != PQnfields(res))
2623                 elog(ERROR, "remote query result does not match the foreign table");
2624
2625         /*
2626          * Build the result tuple in caller's memory context.
2627          */
2628         MemoryContextSwitchTo(oldcontext);
2629
2630         tuple = heap_form_tuple(tupdesc, values, nulls);
2631
2632         if (ctid)
2633                 tuple->t_self = *ctid;
2634
2635         /* Clean up */
2636         MemoryContextReset(temp_context);
2637
2638         return tuple;
2639 }
2640
2641 /*
2642  * Callback function which is called when error occurs during column value
2643  * conversion.  Print names of column and relation.
2644  */
2645 static void
2646 conversion_error_callback(void *arg)
2647 {
2648         ConversionLocation *errpos = (ConversionLocation *) arg;
2649         TupleDesc       tupdesc = RelationGetDescr(errpos->rel);
2650
2651         if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
2652                 errcontext("column \"%s\" of foreign table \"%s\"",
2653                                    NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
2654                                    RelationGetRelationName(errpos->rel));
2655 }