]> granicus.if.org Git - postgresql/blob - contrib/file_fdw/file_fdw.c
Add macros to make AllocSetContextCreate() calls simpler and safer.
[postgresql] / contrib / file_fdw / file_fdw.c
1 /*-------------------------------------------------------------------------
2  *
3  * file_fdw.c
4  *                foreign-data wrapper for server-side flat files.
5  *
6  * Copyright (c) 2010-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                contrib/file_fdw/file_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include <sys/stat.h>
16 #include <unistd.h>
17
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "catalog/pg_foreign_table.h"
22 #include "commands/copy.h"
23 #include "commands/defrem.h"
24 #include "commands/explain.h"
25 #include "commands/vacuum.h"
26 #include "foreign/fdwapi.h"
27 #include "foreign/foreign.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/pathnode.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 #include "utils/sampling.h"
38
39 PG_MODULE_MAGIC;
40
41 /*
42  * Describes the valid options for objects that use this wrapper.
43  */
44 struct FileFdwOption
45 {
46         const char *optname;
47         Oid                     optcontext;             /* Oid of catalog in which option may appear */
48 };
49
50 /*
51  * Valid options for file_fdw.
52  * These options are based on the options for the COPY FROM command.
53  * But note that force_not_null and force_null are handled as boolean options
54  * attached to a column, not as table options.
55  *
56  * Note: If you are adding new option for user mapping, you need to modify
57  * fileGetOptions(), which currently doesn't bother to look at user mappings.
58  */
59 static const struct FileFdwOption valid_options[] = {
60         /* File options */
61         {"filename", ForeignTableRelationId},
62
63         /* Format options */
64         /* oids option is not supported */
65         {"format", ForeignTableRelationId},
66         {"header", ForeignTableRelationId},
67         {"delimiter", ForeignTableRelationId},
68         {"quote", ForeignTableRelationId},
69         {"escape", ForeignTableRelationId},
70         {"null", ForeignTableRelationId},
71         {"encoding", ForeignTableRelationId},
72         {"force_not_null", AttributeRelationId},
73         {"force_null", AttributeRelationId},
74
75         /*
76          * force_quote is not supported by file_fdw because it's for COPY TO.
77          */
78
79         /* Sentinel */
80         {NULL, InvalidOid}
81 };
82
83 /*
84  * FDW-specific information for RelOptInfo.fdw_private.
85  */
86 typedef struct FileFdwPlanState
87 {
88         char       *filename;           /* file to read */
89         List       *options;            /* merged COPY options, excluding filename */
90         BlockNumber pages;                      /* estimate of file's physical size */
91         double          ntuples;                /* estimate of number of rows in file */
92 } FileFdwPlanState;
93
94 /*
95  * FDW-specific information for ForeignScanState.fdw_state.
96  */
97 typedef struct FileFdwExecutionState
98 {
99         char       *filename;           /* file to read */
100         List       *options;            /* merged COPY options, excluding filename */
101         CopyState       cstate;                 /* state of reading file */
102 } FileFdwExecutionState;
103
104 /*
105  * SQL functions
106  */
107 PG_FUNCTION_INFO_V1(file_fdw_handler);
108 PG_FUNCTION_INFO_V1(file_fdw_validator);
109
110 /*
111  * FDW callback routines
112  */
113 static void fileGetForeignRelSize(PlannerInfo *root,
114                                           RelOptInfo *baserel,
115                                           Oid foreigntableid);
116 static void fileGetForeignPaths(PlannerInfo *root,
117                                         RelOptInfo *baserel,
118                                         Oid foreigntableid);
119 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
120                                    RelOptInfo *baserel,
121                                    Oid foreigntableid,
122                                    ForeignPath *best_path,
123                                    List *tlist,
124                                    List *scan_clauses,
125                                    Plan *outer_plan);
126 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
127 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
128 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
129 static void fileReScanForeignScan(ForeignScanState *node);
130 static void fileEndForeignScan(ForeignScanState *node);
131 static bool fileAnalyzeForeignTable(Relation relation,
132                                                 AcquireSampleRowsFunc *func,
133                                                 BlockNumber *totalpages);
134 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
135                                                           RangeTblEntry *rte);
136
137 /*
138  * Helper functions
139  */
140 static bool is_valid_option(const char *option, Oid context);
141 static void fileGetOptions(Oid foreigntableid,
142                            char **filename, List **other_options);
143 static List *get_file_fdw_attribute_options(Oid relid);
144 static bool check_selective_binary_conversion(RelOptInfo *baserel,
145                                                                   Oid foreigntableid,
146                                                                   List **columns);
147 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
148                           FileFdwPlanState *fdw_private);
149 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
150                            FileFdwPlanState *fdw_private,
151                            Cost *startup_cost, Cost *total_cost);
152 static int file_acquire_sample_rows(Relation onerel, int elevel,
153                                                  HeapTuple *rows, int targrows,
154                                                  double *totalrows, double *totaldeadrows);
155
156
157 /*
158  * Foreign-data wrapper handler function: return a struct with pointers
159  * to my callback routines.
160  */
161 Datum
162 file_fdw_handler(PG_FUNCTION_ARGS)
163 {
164         FdwRoutine *fdwroutine = makeNode(FdwRoutine);
165
166         fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
167         fdwroutine->GetForeignPaths = fileGetForeignPaths;
168         fdwroutine->GetForeignPlan = fileGetForeignPlan;
169         fdwroutine->ExplainForeignScan = fileExplainForeignScan;
170         fdwroutine->BeginForeignScan = fileBeginForeignScan;
171         fdwroutine->IterateForeignScan = fileIterateForeignScan;
172         fdwroutine->ReScanForeignScan = fileReScanForeignScan;
173         fdwroutine->EndForeignScan = fileEndForeignScan;
174         fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
175         fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
176
177         PG_RETURN_POINTER(fdwroutine);
178 }
179
180 /*
181  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
182  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
183  *
184  * Raise an ERROR if the option or its value is considered invalid.
185  */
186 Datum
187 file_fdw_validator(PG_FUNCTION_ARGS)
188 {
189         List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
190         Oid                     catalog = PG_GETARG_OID(1);
191         char       *filename = NULL;
192         DefElem    *force_not_null = NULL;
193         DefElem    *force_null = NULL;
194         List       *other_options = NIL;
195         ListCell   *cell;
196
197         /*
198          * Only superusers are allowed to set options of a file_fdw foreign table.
199          * This is because the filename is one of those options, and we don't want
200          * non-superusers to be able to determine which file gets read.
201          *
202          * Putting this sort of permissions check in a validator is a bit of a
203          * crock, but there doesn't seem to be any other place that can enforce
204          * the check more cleanly.
205          *
206          * Note that the valid_options[] array disallows setting filename at any
207          * options level other than foreign table --- otherwise there'd still be a
208          * security hole.
209          */
210         if (catalog == ForeignTableRelationId && !superuser())
211                 ereport(ERROR,
212                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
213                                  errmsg("only superuser can change options of a file_fdw foreign table")));
214
215         /*
216          * Check that only options supported by file_fdw, and allowed for the
217          * current object type, are given.
218          */
219         foreach(cell, options_list)
220         {
221                 DefElem    *def = (DefElem *) lfirst(cell);
222
223                 if (!is_valid_option(def->defname, catalog))
224                 {
225                         const struct FileFdwOption *opt;
226                         StringInfoData buf;
227
228                         /*
229                          * Unknown option specified, complain about it. Provide a hint
230                          * with list of valid options for the object.
231                          */
232                         initStringInfo(&buf);
233                         for (opt = valid_options; opt->optname; opt++)
234                         {
235                                 if (catalog == opt->optcontext)
236                                         appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
237                                                                          opt->optname);
238                         }
239
240                         ereport(ERROR,
241                                         (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
242                                          errmsg("invalid option \"%s\"", def->defname),
243                                          buf.len > 0
244                                          ? errhint("Valid options in this context are: %s",
245                                                            buf.data)
246                                   : errhint("There are no valid options in this context.")));
247                 }
248
249                 /*
250                  * Separate out filename and column-specific options, since
251                  * ProcessCopyOptions won't accept them.
252                  */
253
254                 if (strcmp(def->defname, "filename") == 0)
255                 {
256                         if (filename)
257                                 ereport(ERROR,
258                                                 (errcode(ERRCODE_SYNTAX_ERROR),
259                                                  errmsg("conflicting or redundant options")));
260                         filename = defGetString(def);
261                 }
262
263                 /*
264                  * force_not_null is a boolean option; after validation we can discard
265                  * it - it will be retrieved later in get_file_fdw_attribute_options()
266                  */
267                 else if (strcmp(def->defname, "force_not_null") == 0)
268                 {
269                         if (force_not_null)
270                                 ereport(ERROR,
271                                                 (errcode(ERRCODE_SYNTAX_ERROR),
272                                                  errmsg("conflicting or redundant options"),
273                                                  errhint("option \"force_not_null\" supplied more than once for a column")));
274                         force_not_null = def;
275                         /* Don't care what the value is, as long as it's a legal boolean */
276                         (void) defGetBoolean(def);
277                 }
278                 /* See comments for force_not_null above */
279                 else if (strcmp(def->defname, "force_null") == 0)
280                 {
281                         if (force_null)
282                                 ereport(ERROR,
283                                                 (errcode(ERRCODE_SYNTAX_ERROR),
284                                                  errmsg("conflicting or redundant options"),
285                                                  errhint("option \"force_null\" supplied more than once for a column")));
286                         force_null = def;
287                         (void) defGetBoolean(def);
288                 }
289                 else
290                         other_options = lappend(other_options, def);
291         }
292
293         /*
294          * Now apply the core COPY code's validation logic for more checks.
295          */
296         ProcessCopyOptions(NULL, true, other_options);
297
298         /*
299          * Filename option is required for file_fdw foreign tables.
300          */
301         if (catalog == ForeignTableRelationId && filename == NULL)
302                 ereport(ERROR,
303                                 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
304                                  errmsg("filename is required for file_fdw foreign tables")));
305
306         PG_RETURN_VOID();
307 }
308
309 /*
310  * Check if the provided option is one of the valid options.
311  * context is the Oid of the catalog holding the object the option is for.
312  */
313 static bool
314 is_valid_option(const char *option, Oid context)
315 {
316         const struct FileFdwOption *opt;
317
318         for (opt = valid_options; opt->optname; opt++)
319         {
320                 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
321                         return true;
322         }
323         return false;
324 }
325
326 /*
327  * Fetch the options for a file_fdw foreign table.
328  *
329  * We have to separate out "filename" from the other options because
330  * it must not appear in the options list passed to the core COPY code.
331  */
332 static void
333 fileGetOptions(Oid foreigntableid,
334                            char **filename, List **other_options)
335 {
336         ForeignTable *table;
337         ForeignServer *server;
338         ForeignDataWrapper *wrapper;
339         List       *options;
340         ListCell   *lc,
341                            *prev;
342
343         /*
344          * Extract options from FDW objects.  We ignore user mappings because
345          * file_fdw doesn't have any options that can be specified there.
346          *
347          * (XXX Actually, given the current contents of valid_options[], there's
348          * no point in examining anything except the foreign table's own options.
349          * Simplify?)
350          */
351         table = GetForeignTable(foreigntableid);
352         server = GetForeignServer(table->serverid);
353         wrapper = GetForeignDataWrapper(server->fdwid);
354
355         options = NIL;
356         options = list_concat(options, wrapper->options);
357         options = list_concat(options, server->options);
358         options = list_concat(options, table->options);
359         options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
360
361         /*
362          * Separate out the filename.
363          */
364         *filename = NULL;
365         prev = NULL;
366         foreach(lc, options)
367         {
368                 DefElem    *def = (DefElem *) lfirst(lc);
369
370                 if (strcmp(def->defname, "filename") == 0)
371                 {
372                         *filename = defGetString(def);
373                         options = list_delete_cell(options, lc, prev);
374                         break;
375                 }
376                 prev = lc;
377         }
378
379         /*
380          * The validator should have checked that a filename was included in the
381          * options, but check again, just in case.
382          */
383         if (*filename == NULL)
384                 elog(ERROR, "filename is required for file_fdw foreign tables");
385
386         *other_options = options;
387 }
388
389 /*
390  * Retrieve per-column generic options from pg_attribute and construct a list
391  * of DefElems representing them.
392  *
393  * At the moment we only have "force_not_null", and "force_null",
394  * which should each be combined into a single DefElem listing all such
395  * columns, since that's what COPY expects.
396  */
397 static List *
398 get_file_fdw_attribute_options(Oid relid)
399 {
400         Relation        rel;
401         TupleDesc       tupleDesc;
402         AttrNumber      natts;
403         AttrNumber      attnum;
404         List       *fnncolumns = NIL;
405         List       *fncolumns = NIL;
406
407         List       *options = NIL;
408
409         rel = heap_open(relid, AccessShareLock);
410         tupleDesc = RelationGetDescr(rel);
411         natts = tupleDesc->natts;
412
413         /* Retrieve FDW options for all user-defined attributes. */
414         for (attnum = 1; attnum <= natts; attnum++)
415         {
416                 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
417                 List       *options;
418                 ListCell   *lc;
419
420                 /* Skip dropped attributes. */
421                 if (attr->attisdropped)
422                         continue;
423
424                 options = GetForeignColumnOptions(relid, attnum);
425                 foreach(lc, options)
426                 {
427                         DefElem    *def = (DefElem *) lfirst(lc);
428
429                         if (strcmp(def->defname, "force_not_null") == 0)
430                         {
431                                 if (defGetBoolean(def))
432                                 {
433                                         char       *attname = pstrdup(NameStr(attr->attname));
434
435                                         fnncolumns = lappend(fnncolumns, makeString(attname));
436                                 }
437                         }
438                         else if (strcmp(def->defname, "force_null") == 0)
439                         {
440                                 if (defGetBoolean(def))
441                                 {
442                                         char       *attname = pstrdup(NameStr(attr->attname));
443
444                                         fncolumns = lappend(fncolumns, makeString(attname));
445                                 }
446                         }
447                         /* maybe in future handle other options here */
448                 }
449         }
450
451         heap_close(rel, AccessShareLock);
452
453         /*
454          * Return DefElem only when some column(s) have force_not_null /
455          * force_null options set
456          */
457         if (fnncolumns != NIL)
458                 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns));
459
460         if (fncolumns != NIL)
461                 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns));
462
463         return options;
464 }
465
466 /*
467  * fileGetForeignRelSize
468  *              Obtain relation size estimates for a foreign table
469  */
470 static void
471 fileGetForeignRelSize(PlannerInfo *root,
472                                           RelOptInfo *baserel,
473                                           Oid foreigntableid)
474 {
475         FileFdwPlanState *fdw_private;
476
477         /*
478          * Fetch options.  We only need filename at this point, but we might as
479          * well get everything and not need to re-fetch it later in planning.
480          */
481         fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
482         fileGetOptions(foreigntableid,
483                                    &fdw_private->filename, &fdw_private->options);
484         baserel->fdw_private = (void *) fdw_private;
485
486         /* Estimate relation size */
487         estimate_size(root, baserel, fdw_private);
488 }
489
490 /*
491  * fileGetForeignPaths
492  *              Create possible access paths for a scan on the foreign table
493  *
494  *              Currently we don't support any push-down feature, so there is only one
495  *              possible access path, which simply returns all records in the order in
496  *              the data file.
497  */
498 static void
499 fileGetForeignPaths(PlannerInfo *root,
500                                         RelOptInfo *baserel,
501                                         Oid foreigntableid)
502 {
503         FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
504         Cost            startup_cost;
505         Cost            total_cost;
506         List       *columns;
507         List       *coptions = NIL;
508
509         /* Decide whether to selectively perform binary conversion */
510         if (check_selective_binary_conversion(baserel,
511                                                                                   foreigntableid,
512                                                                                   &columns))
513                 coptions = list_make1(makeDefElem("convert_selectively",
514                                                                                   (Node *) columns));
515
516         /* Estimate costs */
517         estimate_costs(root, baserel, fdw_private,
518                                    &startup_cost, &total_cost);
519
520         /*
521          * Create a ForeignPath node and add it as only possible path.  We use the
522          * fdw_private list of the path to carry the convert_selectively option;
523          * it will be propagated into the fdw_private list of the Plan node.
524          */
525         add_path(baserel, (Path *)
526                          create_foreignscan_path(root, baserel,
527                                                                          NULL,          /* default pathtarget */
528                                                                          baserel->rows,
529                                                                          startup_cost,
530                                                                          total_cost,
531                                                                          NIL,           /* no pathkeys */
532                                                                          NULL,          /* no outer rel either */
533                                                                          NULL,          /* no extra plan */
534                                                                          coptions));
535
536         /*
537          * If data file was sorted, and we knew it somehow, we could insert
538          * appropriate pathkeys into the ForeignPath node to tell the planner
539          * that.
540          */
541 }
542
543 /*
544  * fileGetForeignPlan
545  *              Create a ForeignScan plan node for scanning the foreign table
546  */
547 static ForeignScan *
548 fileGetForeignPlan(PlannerInfo *root,
549                                    RelOptInfo *baserel,
550                                    Oid foreigntableid,
551                                    ForeignPath *best_path,
552                                    List *tlist,
553                                    List *scan_clauses,
554                                    Plan *outer_plan)
555 {
556         Index           scan_relid = baserel->relid;
557
558         /*
559          * We have no native ability to evaluate restriction clauses, so we just
560          * put all the scan_clauses into the plan node's qual list for the
561          * executor to check.  So all we have to do here is strip RestrictInfo
562          * nodes from the clauses and ignore pseudoconstants (which will be
563          * handled elsewhere).
564          */
565         scan_clauses = extract_actual_clauses(scan_clauses, false);
566
567         /* Create the ForeignScan node */
568         return make_foreignscan(tlist,
569                                                         scan_clauses,
570                                                         scan_relid,
571                                                         NIL,    /* no expressions to evaluate */
572                                                         best_path->fdw_private,
573                                                         NIL,    /* no custom tlist */
574                                                         NIL,    /* no remote quals */
575                                                         outer_plan);
576 }
577
578 /*
579  * fileExplainForeignScan
580  *              Produce extra output for EXPLAIN
581  */
582 static void
583 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
584 {
585         char       *filename;
586         List       *options;
587
588         /* Fetch options --- we only need filename at this point */
589         fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
590                                    &filename, &options);
591
592         ExplainPropertyText("Foreign File", filename, es);
593
594         /* Suppress file size if we're not showing cost details */
595         if (es->costs)
596         {
597                 struct stat stat_buf;
598
599                 if (stat(filename, &stat_buf) == 0)
600                         ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
601                                                                 es);
602         }
603 }
604
605 /*
606  * fileBeginForeignScan
607  *              Initiate access to the file by creating CopyState
608  */
609 static void
610 fileBeginForeignScan(ForeignScanState *node, int eflags)
611 {
612         ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
613         char       *filename;
614         List       *options;
615         CopyState       cstate;
616         FileFdwExecutionState *festate;
617
618         /*
619          * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
620          */
621         if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
622                 return;
623
624         /* Fetch options of foreign table */
625         fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
626                                    &filename, &options);
627
628         /* Add any options from the plan (currently only convert_selectively) */
629         options = list_concat(options, plan->fdw_private);
630
631         /*
632          * Create CopyState from FDW options.  We always acquire all columns, so
633          * as to match the expected ScanTupleSlot signature.
634          */
635         cstate = BeginCopyFrom(node->ss.ss_currentRelation,
636                                                    filename,
637                                                    false,
638                                                    NIL,
639                                                    options);
640
641         /*
642          * Save state in node->fdw_state.  We must save enough information to call
643          * BeginCopyFrom() again.
644          */
645         festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
646         festate->filename = filename;
647         festate->options = options;
648         festate->cstate = cstate;
649
650         node->fdw_state = (void *) festate;
651 }
652
653 /*
654  * fileIterateForeignScan
655  *              Read next record from the data file and store it into the
656  *              ScanTupleSlot as a virtual tuple
657  */
658 static TupleTableSlot *
659 fileIterateForeignScan(ForeignScanState *node)
660 {
661         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
662         TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
663         bool            found;
664         ErrorContextCallback errcallback;
665
666         /* Set up callback to identify error line number. */
667         errcallback.callback = CopyFromErrorCallback;
668         errcallback.arg = (void *) festate->cstate;
669         errcallback.previous = error_context_stack;
670         error_context_stack = &errcallback;
671
672         /*
673          * The protocol for loading a virtual tuple into a slot is first
674          * ExecClearTuple, then fill the values/isnull arrays, then
675          * ExecStoreVirtualTuple.  If we don't find another row in the file, we
676          * just skip the last step, leaving the slot empty as required.
677          *
678          * We can pass ExprContext = NULL because we read all columns from the
679          * file, so no need to evaluate default expressions.
680          *
681          * We can also pass tupleOid = NULL because we don't allow oids for
682          * foreign tables.
683          */
684         ExecClearTuple(slot);
685         found = NextCopyFrom(festate->cstate, NULL,
686                                                  slot->tts_values, slot->tts_isnull,
687                                                  NULL);
688         if (found)
689                 ExecStoreVirtualTuple(slot);
690
691         /* Remove error callback. */
692         error_context_stack = errcallback.previous;
693
694         return slot;
695 }
696
697 /*
698  * fileReScanForeignScan
699  *              Rescan table, possibly with new parameters
700  */
701 static void
702 fileReScanForeignScan(ForeignScanState *node)
703 {
704         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
705
706         EndCopyFrom(festate->cstate);
707
708         festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
709                                                                         festate->filename,
710                                                                         false,
711                                                                         NIL,
712                                                                         festate->options);
713 }
714
715 /*
716  * fileEndForeignScan
717  *              Finish scanning foreign table and dispose objects used for this scan
718  */
719 static void
720 fileEndForeignScan(ForeignScanState *node)
721 {
722         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
723
724         /* if festate is NULL, we are in EXPLAIN; nothing to do */
725         if (festate)
726                 EndCopyFrom(festate->cstate);
727 }
728
729 /*
730  * fileAnalyzeForeignTable
731  *              Test whether analyzing this foreign table is supported
732  */
733 static bool
734 fileAnalyzeForeignTable(Relation relation,
735                                                 AcquireSampleRowsFunc *func,
736                                                 BlockNumber *totalpages)
737 {
738         char       *filename;
739         List       *options;
740         struct stat stat_buf;
741
742         /* Fetch options of foreign table */
743         fileGetOptions(RelationGetRelid(relation), &filename, &options);
744
745         /*
746          * Get size of the file.  (XXX if we fail here, would it be better to just
747          * return false to skip analyzing the table?)
748          */
749         if (stat(filename, &stat_buf) < 0)
750                 ereport(ERROR,
751                                 (errcode_for_file_access(),
752                                  errmsg("could not stat file \"%s\": %m",
753                                                 filename)));
754
755         /*
756          * Convert size to pages.  Must return at least 1 so that we can tell
757          * later on that pg_class.relpages is not default.
758          */
759         *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
760         if (*totalpages < 1)
761                 *totalpages = 1;
762
763         *func = file_acquire_sample_rows;
764
765         return true;
766 }
767
768 /*
769  * fileIsForeignScanParallelSafe
770  *              Reading a file in a parallel worker should work just the same as
771  *              reading it in the leader, so mark scans safe.
772  */
773 static bool
774 fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
775                                                           RangeTblEntry *rte)
776 {
777         return true;
778 }
779
780 /*
781  * check_selective_binary_conversion
782  *
783  * Check to see if it's useful to convert only a subset of the file's columns
784  * to binary.  If so, construct a list of the column names to be converted,
785  * return that at *columns, and return TRUE.  (Note that it's possible to
786  * determine that no columns need be converted, for instance with a COUNT(*)
787  * query.  So we can't use returning a NIL list to indicate failure.)
788  */
789 static bool
790 check_selective_binary_conversion(RelOptInfo *baserel,
791                                                                   Oid foreigntableid,
792                                                                   List **columns)
793 {
794         ForeignTable *table;
795         ListCell   *lc;
796         Relation        rel;
797         TupleDesc       tupleDesc;
798         AttrNumber      attnum;
799         Bitmapset  *attrs_used = NULL;
800         bool            has_wholerow = false;
801         int                     numattrs;
802         int                     i;
803
804         *columns = NIL;                         /* default result */
805
806         /*
807          * Check format of the file.  If binary format, this is irrelevant.
808          */
809         table = GetForeignTable(foreigntableid);
810         foreach(lc, table->options)
811         {
812                 DefElem    *def = (DefElem *) lfirst(lc);
813
814                 if (strcmp(def->defname, "format") == 0)
815                 {
816                         char       *format = defGetString(def);
817
818                         if (strcmp(format, "binary") == 0)
819                                 return false;
820                         break;
821                 }
822         }
823
824         /* Collect all the attributes needed for joins or final output. */
825         pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
826                                    &attrs_used);
827
828         /* Add all the attributes used by restriction clauses. */
829         foreach(lc, baserel->baserestrictinfo)
830         {
831                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
832
833                 pull_varattnos((Node *) rinfo->clause, baserel->relid,
834                                            &attrs_used);
835         }
836
837         /* Convert attribute numbers to column names. */
838         rel = heap_open(foreigntableid, AccessShareLock);
839         tupleDesc = RelationGetDescr(rel);
840
841         while ((attnum = bms_first_member(attrs_used)) >= 0)
842         {
843                 /* Adjust for system attributes. */
844                 attnum += FirstLowInvalidHeapAttributeNumber;
845
846                 if (attnum == 0)
847                 {
848                         has_wholerow = true;
849                         break;
850                 }
851
852                 /* Ignore system attributes. */
853                 if (attnum < 0)
854                         continue;
855
856                 /* Get user attributes. */
857                 if (attnum > 0)
858                 {
859                         Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
860                         char       *attname = NameStr(attr->attname);
861
862                         /* Skip dropped attributes (probably shouldn't see any here). */
863                         if (attr->attisdropped)
864                                 continue;
865                         *columns = lappend(*columns, makeString(pstrdup(attname)));
866                 }
867         }
868
869         /* Count non-dropped user attributes while we have the tupdesc. */
870         numattrs = 0;
871         for (i = 0; i < tupleDesc->natts; i++)
872         {
873                 Form_pg_attribute attr = tupleDesc->attrs[i];
874
875                 if (attr->attisdropped)
876                         continue;
877                 numattrs++;
878         }
879
880         heap_close(rel, AccessShareLock);
881
882         /* If there's a whole-row reference, fail: we need all the columns. */
883         if (has_wholerow)
884         {
885                 *columns = NIL;
886                 return false;
887         }
888
889         /* If all the user attributes are needed, fail. */
890         if (numattrs == list_length(*columns))
891         {
892                 *columns = NIL;
893                 return false;
894         }
895
896         return true;
897 }
898
899 /*
900  * Estimate size of a foreign table.
901  *
902  * The main result is returned in baserel->rows.  We also set
903  * fdw_private->pages and fdw_private->ntuples for later use in the cost
904  * calculation.
905  */
906 static void
907 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
908                           FileFdwPlanState *fdw_private)
909 {
910         struct stat stat_buf;
911         BlockNumber pages;
912         double          ntuples;
913         double          nrows;
914
915         /*
916          * Get size of the file.  It might not be there at plan time, though, in
917          * which case we have to use a default estimate.
918          */
919         if (stat(fdw_private->filename, &stat_buf) < 0)
920                 stat_buf.st_size = 10 * BLCKSZ;
921
922         /*
923          * Convert size to pages for use in I/O cost estimate later.
924          */
925         pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
926         if (pages < 1)
927                 pages = 1;
928         fdw_private->pages = pages;
929
930         /*
931          * Estimate the number of tuples in the file.
932          */
933         if (baserel->pages > 0)
934         {
935                 /*
936                  * We have # of pages and # of tuples from pg_class (that is, from a
937                  * previous ANALYZE), so compute a tuples-per-page estimate and scale
938                  * that by the current file size.
939                  */
940                 double          density;
941
942                 density = baserel->tuples / (double) baserel->pages;
943                 ntuples = clamp_row_est(density * (double) pages);
944         }
945         else
946         {
947                 /*
948                  * Otherwise we have to fake it.  We back into this estimate using the
949                  * planner's idea of the relation width; which is bogus if not all
950                  * columns are being read, not to mention that the text representation
951                  * of a row probably isn't the same size as its internal
952                  * representation.  Possibly we could do something better, but the
953                  * real answer to anyone who complains is "ANALYZE" ...
954                  */
955                 int                     tuple_width;
956
957                 tuple_width = MAXALIGN(baserel->reltarget->width) +
958                         MAXALIGN(SizeofHeapTupleHeader);
959                 ntuples = clamp_row_est((double) stat_buf.st_size /
960                                                                 (double) tuple_width);
961         }
962         fdw_private->ntuples = ntuples;
963
964         /*
965          * Now estimate the number of rows returned by the scan after applying the
966          * baserestrictinfo quals.
967          */
968         nrows = ntuples *
969                 clauselist_selectivity(root,
970                                                            baserel->baserestrictinfo,
971                                                            0,
972                                                            JOIN_INNER,
973                                                            NULL);
974
975         nrows = clamp_row_est(nrows);
976
977         /* Save the output-rows estimate for the planner */
978         baserel->rows = nrows;
979 }
980
981 /*
982  * Estimate costs of scanning a foreign table.
983  *
984  * Results are returned in *startup_cost and *total_cost.
985  */
986 static void
987 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
988                            FileFdwPlanState *fdw_private,
989                            Cost *startup_cost, Cost *total_cost)
990 {
991         BlockNumber pages = fdw_private->pages;
992         double          ntuples = fdw_private->ntuples;
993         Cost            run_cost = 0;
994         Cost            cpu_per_tuple;
995
996         /*
997          * We estimate costs almost the same way as cost_seqscan(), thus assuming
998          * that I/O costs are equivalent to a regular table file of the same size.
999          * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1000          * for the cost of parsing records.
1001          */
1002         run_cost += seq_page_cost * pages;
1003
1004         *startup_cost = baserel->baserestrictcost.startup;
1005         cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1006         run_cost += cpu_per_tuple * ntuples;
1007         *total_cost = *startup_cost + run_cost;
1008 }
1009
1010 /*
1011  * file_acquire_sample_rows -- acquire a random sample of rows from the table
1012  *
1013  * Selected rows are returned in the caller-allocated array rows[],
1014  * which must have at least targrows entries.
1015  * The actual number of rows selected is returned as the function result.
1016  * We also count the total number of rows in the file and return it into
1017  * *totalrows.  Note that *totaldeadrows is always set to 0.
1018  *
1019  * Note that the returned list of rows is not always in order by physical
1020  * position in the file.  Therefore, correlation estimates derived later
1021  * may be meaningless, but it's OK because we don't use the estimates
1022  * currently (the planner only pays attention to correlation for indexscans).
1023  */
1024 static int
1025 file_acquire_sample_rows(Relation onerel, int elevel,
1026                                                  HeapTuple *rows, int targrows,
1027                                                  double *totalrows, double *totaldeadrows)
1028 {
1029         int                     numrows = 0;
1030         double          rowstoskip = -1;        /* -1 means not set yet */
1031         ReservoirStateData rstate;
1032         TupleDesc       tupDesc;
1033         Datum      *values;
1034         bool       *nulls;
1035         bool            found;
1036         char       *filename;
1037         List       *options;
1038         CopyState       cstate;
1039         ErrorContextCallback errcallback;
1040         MemoryContext oldcontext = CurrentMemoryContext;
1041         MemoryContext tupcontext;
1042
1043         Assert(onerel);
1044         Assert(targrows > 0);
1045
1046         tupDesc = RelationGetDescr(onerel);
1047         values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1048         nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1049
1050         /* Fetch options of foreign table */
1051         fileGetOptions(RelationGetRelid(onerel), &filename, &options);
1052
1053         /*
1054          * Create CopyState from FDW options.
1055          */
1056         cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
1057
1058         /*
1059          * Use per-tuple memory context to prevent leak of memory used to read
1060          * rows from the file with Copy routines.
1061          */
1062         tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1063                                                                            "file_fdw temporary context",
1064                                                                            ALLOCSET_DEFAULT_SIZES);
1065
1066         /* Prepare for sampling rows */
1067         reservoir_init_selection_state(&rstate, targrows);
1068
1069         /* Set up callback to identify error line number. */
1070         errcallback.callback = CopyFromErrorCallback;
1071         errcallback.arg = (void *) cstate;
1072         errcallback.previous = error_context_stack;
1073         error_context_stack = &errcallback;
1074
1075         *totalrows = 0;
1076         *totaldeadrows = 0;
1077         for (;;)
1078         {
1079                 /* Check for user-requested abort or sleep */
1080                 vacuum_delay_point();
1081
1082                 /* Fetch next row */
1083                 MemoryContextReset(tupcontext);
1084                 MemoryContextSwitchTo(tupcontext);
1085
1086                 found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
1087
1088                 MemoryContextSwitchTo(oldcontext);
1089
1090                 if (!found)
1091                         break;
1092
1093                 /*
1094                  * The first targrows sample rows are simply copied into the
1095                  * reservoir.  Then we start replacing tuples in the sample until we
1096                  * reach the end of the relation. This algorithm is from Jeff Vitter's
1097                  * paper (see more info in commands/analyze.c).
1098                  */
1099                 if (numrows < targrows)
1100                 {
1101                         rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1102                 }
1103                 else
1104                 {
1105                         /*
1106                          * t in Vitter's paper is the number of records already processed.
1107                          * If we need to compute a new S value, we must use the
1108                          * not-yet-incremented value of totalrows as t.
1109                          */
1110                         if (rowstoskip < 0)
1111                                 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1112
1113                         if (rowstoskip <= 0)
1114                         {
1115                                 /*
1116                                  * Found a suitable tuple, so save it, replacing one old tuple
1117                                  * at random
1118                                  */
1119                                 int                     k = (int) (targrows * sampler_random_fract(rstate.randstate));
1120
1121                                 Assert(k >= 0 && k < targrows);
1122                                 heap_freetuple(rows[k]);
1123                                 rows[k] = heap_form_tuple(tupDesc, values, nulls);
1124                         }
1125
1126                         rowstoskip -= 1;
1127                 }
1128
1129                 *totalrows += 1;
1130         }
1131
1132         /* Remove error callback. */
1133         error_context_stack = errcallback.previous;
1134
1135         /* Clean up. */
1136         MemoryContextDelete(tupcontext);
1137
1138         EndCopyFrom(cstate);
1139
1140         pfree(values);
1141         pfree(nulls);
1142
1143         /*
1144          * Emit some interesting relation info
1145          */
1146         ereport(elevel,
1147                         (errmsg("\"%s\": file contains %.0f rows; "
1148                                         "%d rows in sample",
1149                                         RelationGetRelationName(onerel),
1150                                         *totalrows, numrows)));
1151
1152         return numrows;
1153 }