]> granicus.if.org Git - postgresql/blob - contrib/file_fdw/file_fdw.c
Update copyright for 2016
[postgresql] / contrib / file_fdw / file_fdw.c
1 /*-------------------------------------------------------------------------
2  *
3  * file_fdw.c
4  *                foreign-data wrapper for server-side flat files.
5  *
6  * Copyright (c) 2010-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                contrib/file_fdw/file_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include <sys/stat.h>
16 #include <unistd.h>
17
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "catalog/pg_foreign_table.h"
22 #include "commands/copy.h"
23 #include "commands/defrem.h"
24 #include "commands/explain.h"
25 #include "commands/vacuum.h"
26 #include "foreign/fdwapi.h"
27 #include "foreign/foreign.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/pathnode.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 #include "utils/sampling.h"
38
39 PG_MODULE_MAGIC;
40
41 /*
42  * Describes the valid options for objects that use this wrapper.
43  */
44 struct FileFdwOption
45 {
46         const char *optname;
47         Oid                     optcontext;             /* Oid of catalog in which option may appear */
48 };
49
50 /*
51  * Valid options for file_fdw.
52  * These options are based on the options for the COPY FROM command.
53  * But note that force_not_null and force_null are handled as boolean options
54  * attached to a column, not as table options.
55  *
56  * Note: If you are adding new option for user mapping, you need to modify
57  * fileGetOptions(), which currently doesn't bother to look at user mappings.
58  */
59 static const struct FileFdwOption valid_options[] = {
60         /* File options */
61         {"filename", ForeignTableRelationId},
62
63         /* Format options */
64         /* oids option is not supported */
65         {"format", ForeignTableRelationId},
66         {"header", ForeignTableRelationId},
67         {"delimiter", ForeignTableRelationId},
68         {"quote", ForeignTableRelationId},
69         {"escape", ForeignTableRelationId},
70         {"null", ForeignTableRelationId},
71         {"encoding", ForeignTableRelationId},
72         {"force_not_null", AttributeRelationId},
73         {"force_null", AttributeRelationId},
74
75         /*
76          * force_quote is not supported by file_fdw because it's for COPY TO.
77          */
78
79         /* Sentinel */
80         {NULL, InvalidOid}
81 };
82
83 /*
84  * FDW-specific information for RelOptInfo.fdw_private.
85  */
86 typedef struct FileFdwPlanState
87 {
88         char       *filename;           /* file to read */
89         List       *options;            /* merged COPY options, excluding filename */
90         BlockNumber pages;                      /* estimate of file's physical size */
91         double          ntuples;                /* estimate of number of rows in file */
92 } FileFdwPlanState;
93
94 /*
95  * FDW-specific information for ForeignScanState.fdw_state.
96  */
97 typedef struct FileFdwExecutionState
98 {
99         char       *filename;           /* file to read */
100         List       *options;            /* merged COPY options, excluding filename */
101         CopyState       cstate;                 /* state of reading file */
102 } FileFdwExecutionState;
103
104 /*
105  * SQL functions
106  */
107 PG_FUNCTION_INFO_V1(file_fdw_handler);
108 PG_FUNCTION_INFO_V1(file_fdw_validator);
109
110 /*
111  * FDW callback routines
112  */
113 static void fileGetForeignRelSize(PlannerInfo *root,
114                                           RelOptInfo *baserel,
115                                           Oid foreigntableid);
116 static void fileGetForeignPaths(PlannerInfo *root,
117                                         RelOptInfo *baserel,
118                                         Oid foreigntableid);
119 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
120                                    RelOptInfo *baserel,
121                                    Oid foreigntableid,
122                                    ForeignPath *best_path,
123                                    List *tlist,
124                                    List *scan_clauses,
125                                    Plan *outer_plan);
126 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
127 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
128 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
129 static void fileReScanForeignScan(ForeignScanState *node);
130 static void fileEndForeignScan(ForeignScanState *node);
131 static bool fileAnalyzeForeignTable(Relation relation,
132                                                 AcquireSampleRowsFunc *func,
133                                                 BlockNumber *totalpages);
134
135 /*
136  * Helper functions
137  */
138 static bool is_valid_option(const char *option, Oid context);
139 static void fileGetOptions(Oid foreigntableid,
140                            char **filename, List **other_options);
141 static List *get_file_fdw_attribute_options(Oid relid);
142 static bool check_selective_binary_conversion(RelOptInfo *baserel,
143                                                                   Oid foreigntableid,
144                                                                   List **columns);
145 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
146                           FileFdwPlanState *fdw_private);
147 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
148                            FileFdwPlanState *fdw_private,
149                            Cost *startup_cost, Cost *total_cost);
150 static int file_acquire_sample_rows(Relation onerel, int elevel,
151                                                  HeapTuple *rows, int targrows,
152                                                  double *totalrows, double *totaldeadrows);
153
154
155 /*
156  * Foreign-data wrapper handler function: return a struct with pointers
157  * to my callback routines.
158  */
159 Datum
160 file_fdw_handler(PG_FUNCTION_ARGS)
161 {
162         FdwRoutine *fdwroutine = makeNode(FdwRoutine);
163
164         fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
165         fdwroutine->GetForeignPaths = fileGetForeignPaths;
166         fdwroutine->GetForeignPlan = fileGetForeignPlan;
167         fdwroutine->ExplainForeignScan = fileExplainForeignScan;
168         fdwroutine->BeginForeignScan = fileBeginForeignScan;
169         fdwroutine->IterateForeignScan = fileIterateForeignScan;
170         fdwroutine->ReScanForeignScan = fileReScanForeignScan;
171         fdwroutine->EndForeignScan = fileEndForeignScan;
172         fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
173
174         PG_RETURN_POINTER(fdwroutine);
175 }
176
177 /*
178  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
179  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
180  *
181  * Raise an ERROR if the option or its value is considered invalid.
182  */
183 Datum
184 file_fdw_validator(PG_FUNCTION_ARGS)
185 {
186         List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
187         Oid                     catalog = PG_GETARG_OID(1);
188         char       *filename = NULL;
189         DefElem    *force_not_null = NULL;
190         DefElem    *force_null = NULL;
191         List       *other_options = NIL;
192         ListCell   *cell;
193
194         /*
195          * Only superusers are allowed to set options of a file_fdw foreign table.
196          * This is because the filename is one of those options, and we don't want
197          * non-superusers to be able to determine which file gets read.
198          *
199          * Putting this sort of permissions check in a validator is a bit of a
200          * crock, but there doesn't seem to be any other place that can enforce
201          * the check more cleanly.
202          *
203          * Note that the valid_options[] array disallows setting filename at any
204          * options level other than foreign table --- otherwise there'd still be a
205          * security hole.
206          */
207         if (catalog == ForeignTableRelationId && !superuser())
208                 ereport(ERROR,
209                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
210                                  errmsg("only superuser can change options of a file_fdw foreign table")));
211
212         /*
213          * Check that only options supported by file_fdw, and allowed for the
214          * current object type, are given.
215          */
216         foreach(cell, options_list)
217         {
218                 DefElem    *def = (DefElem *) lfirst(cell);
219
220                 if (!is_valid_option(def->defname, catalog))
221                 {
222                         const struct FileFdwOption *opt;
223                         StringInfoData buf;
224
225                         /*
226                          * Unknown option specified, complain about it. Provide a hint
227                          * with list of valid options for the object.
228                          */
229                         initStringInfo(&buf);
230                         for (opt = valid_options; opt->optname; opt++)
231                         {
232                                 if (catalog == opt->optcontext)
233                                         appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
234                                                                          opt->optname);
235                         }
236
237                         ereport(ERROR,
238                                         (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
239                                          errmsg("invalid option \"%s\"", def->defname),
240                                          buf.len > 0
241                                          ? errhint("Valid options in this context are: %s",
242                                                            buf.data)
243                                   : errhint("There are no valid options in this context.")));
244                 }
245
246                 /*
247                  * Separate out filename and column-specific options, since
248                  * ProcessCopyOptions won't accept them.
249                  */
250
251                 if (strcmp(def->defname, "filename") == 0)
252                 {
253                         if (filename)
254                                 ereport(ERROR,
255                                                 (errcode(ERRCODE_SYNTAX_ERROR),
256                                                  errmsg("conflicting or redundant options")));
257                         filename = defGetString(def);
258                 }
259
260                 /*
261                  * force_not_null is a boolean option; after validation we can discard
262                  * it - it will be retrieved later in get_file_fdw_attribute_options()
263                  */
264                 else if (strcmp(def->defname, "force_not_null") == 0)
265                 {
266                         if (force_not_null)
267                                 ereport(ERROR,
268                                                 (errcode(ERRCODE_SYNTAX_ERROR),
269                                                  errmsg("conflicting or redundant options"),
270                                                  errhint("option \"force_not_null\" supplied more than once for a column")));
271                         force_not_null = def;
272                         /* Don't care what the value is, as long as it's a legal boolean */
273                         (void) defGetBoolean(def);
274                 }
275                 /* See comments for force_not_null above */
276                 else if (strcmp(def->defname, "force_null") == 0)
277                 {
278                         if (force_null)
279                                 ereport(ERROR,
280                                                 (errcode(ERRCODE_SYNTAX_ERROR),
281                                                  errmsg("conflicting or redundant options"),
282                                                  errhint("option \"force_null\" supplied more than once for a column")));
283                         force_null = def;
284                         (void) defGetBoolean(def);
285                 }
286                 else
287                         other_options = lappend(other_options, def);
288         }
289
290         /*
291          * Now apply the core COPY code's validation logic for more checks.
292          */
293         ProcessCopyOptions(NULL, true, other_options);
294
295         /*
296          * Filename option is required for file_fdw foreign tables.
297          */
298         if (catalog == ForeignTableRelationId && filename == NULL)
299                 ereport(ERROR,
300                                 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
301                                  errmsg("filename is required for file_fdw foreign tables")));
302
303         PG_RETURN_VOID();
304 }
305
306 /*
307  * Check if the provided option is one of the valid options.
308  * context is the Oid of the catalog holding the object the option is for.
309  */
310 static bool
311 is_valid_option(const char *option, Oid context)
312 {
313         const struct FileFdwOption *opt;
314
315         for (opt = valid_options; opt->optname; opt++)
316         {
317                 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
318                         return true;
319         }
320         return false;
321 }
322
323 /*
324  * Fetch the options for a file_fdw foreign table.
325  *
326  * We have to separate out "filename" from the other options because
327  * it must not appear in the options list passed to the core COPY code.
328  */
329 static void
330 fileGetOptions(Oid foreigntableid,
331                            char **filename, List **other_options)
332 {
333         ForeignTable *table;
334         ForeignServer *server;
335         ForeignDataWrapper *wrapper;
336         List       *options;
337         ListCell   *lc,
338                            *prev;
339
340         /*
341          * Extract options from FDW objects.  We ignore user mappings because
342          * file_fdw doesn't have any options that can be specified there.
343          *
344          * (XXX Actually, given the current contents of valid_options[], there's
345          * no point in examining anything except the foreign table's own options.
346          * Simplify?)
347          */
348         table = GetForeignTable(foreigntableid);
349         server = GetForeignServer(table->serverid);
350         wrapper = GetForeignDataWrapper(server->fdwid);
351
352         options = NIL;
353         options = list_concat(options, wrapper->options);
354         options = list_concat(options, server->options);
355         options = list_concat(options, table->options);
356         options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
357
358         /*
359          * Separate out the filename.
360          */
361         *filename = NULL;
362         prev = NULL;
363         foreach(lc, options)
364         {
365                 DefElem    *def = (DefElem *) lfirst(lc);
366
367                 if (strcmp(def->defname, "filename") == 0)
368                 {
369                         *filename = defGetString(def);
370                         options = list_delete_cell(options, lc, prev);
371                         break;
372                 }
373                 prev = lc;
374         }
375
376         /*
377          * The validator should have checked that a filename was included in the
378          * options, but check again, just in case.
379          */
380         if (*filename == NULL)
381                 elog(ERROR, "filename is required for file_fdw foreign tables");
382
383         *other_options = options;
384 }
385
386 /*
387  * Retrieve per-column generic options from pg_attribute and construct a list
388  * of DefElems representing them.
389  *
390  * At the moment we only have "force_not_null", and "force_null",
391  * which should each be combined into a single DefElem listing all such
392  * columns, since that's what COPY expects.
393  */
394 static List *
395 get_file_fdw_attribute_options(Oid relid)
396 {
397         Relation        rel;
398         TupleDesc       tupleDesc;
399         AttrNumber      natts;
400         AttrNumber      attnum;
401         List       *fnncolumns = NIL;
402         List       *fncolumns = NIL;
403
404         List       *options = NIL;
405
406         rel = heap_open(relid, AccessShareLock);
407         tupleDesc = RelationGetDescr(rel);
408         natts = tupleDesc->natts;
409
410         /* Retrieve FDW options for all user-defined attributes. */
411         for (attnum = 1; attnum <= natts; attnum++)
412         {
413                 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
414                 List       *options;
415                 ListCell   *lc;
416
417                 /* Skip dropped attributes. */
418                 if (attr->attisdropped)
419                         continue;
420
421                 options = GetForeignColumnOptions(relid, attnum);
422                 foreach(lc, options)
423                 {
424                         DefElem    *def = (DefElem *) lfirst(lc);
425
426                         if (strcmp(def->defname, "force_not_null") == 0)
427                         {
428                                 if (defGetBoolean(def))
429                                 {
430                                         char       *attname = pstrdup(NameStr(attr->attname));
431
432                                         fnncolumns = lappend(fnncolumns, makeString(attname));
433                                 }
434                         }
435                         else if (strcmp(def->defname, "force_null") == 0)
436                         {
437                                 if (defGetBoolean(def))
438                                 {
439                                         char       *attname = pstrdup(NameStr(attr->attname));
440
441                                         fncolumns = lappend(fncolumns, makeString(attname));
442                                 }
443                         }
444                         /* maybe in future handle other options here */
445                 }
446         }
447
448         heap_close(rel, AccessShareLock);
449
450         /*
451          * Return DefElem only when some column(s) have force_not_null /
452          * force_null options set
453          */
454         if (fnncolumns != NIL)
455                 options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns));
456
457         if (fncolumns != NIL)
458                 options = lappend(options, makeDefElem("force_null", (Node *) fncolumns));
459
460         return options;
461 }
462
463 /*
464  * fileGetForeignRelSize
465  *              Obtain relation size estimates for a foreign table
466  */
467 static void
468 fileGetForeignRelSize(PlannerInfo *root,
469                                           RelOptInfo *baserel,
470                                           Oid foreigntableid)
471 {
472         FileFdwPlanState *fdw_private;
473
474         /*
475          * Fetch options.  We only need filename at this point, but we might as
476          * well get everything and not need to re-fetch it later in planning.
477          */
478         fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
479         fileGetOptions(foreigntableid,
480                                    &fdw_private->filename, &fdw_private->options);
481         baserel->fdw_private = (void *) fdw_private;
482
483         /* Estimate relation size */
484         estimate_size(root, baserel, fdw_private);
485 }
486
487 /*
488  * fileGetForeignPaths
489  *              Create possible access paths for a scan on the foreign table
490  *
491  *              Currently we don't support any push-down feature, so there is only one
492  *              possible access path, which simply returns all records in the order in
493  *              the data file.
494  */
495 static void
496 fileGetForeignPaths(PlannerInfo *root,
497                                         RelOptInfo *baserel,
498                                         Oid foreigntableid)
499 {
500         FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
501         Cost            startup_cost;
502         Cost            total_cost;
503         List       *columns;
504         List       *coptions = NIL;
505
506         /* Decide whether to selectively perform binary conversion */
507         if (check_selective_binary_conversion(baserel,
508                                                                                   foreigntableid,
509                                                                                   &columns))
510                 coptions = list_make1(makeDefElem("convert_selectively",
511                                                                                   (Node *) columns));
512
513         /* Estimate costs */
514         estimate_costs(root, baserel, fdw_private,
515                                    &startup_cost, &total_cost);
516
517         /*
518          * Create a ForeignPath node and add it as only possible path.  We use the
519          * fdw_private list of the path to carry the convert_selectively option;
520          * it will be propagated into the fdw_private list of the Plan node.
521          */
522         add_path(baserel, (Path *)
523                          create_foreignscan_path(root, baserel,
524                                                                          baserel->rows,
525                                                                          startup_cost,
526                                                                          total_cost,
527                                                                          NIL,           /* no pathkeys */
528                                                                          NULL,          /* no outer rel either */
529                                                                          NULL,          /* no extra plan */
530                                                                          coptions));
531
532         /*
533          * If data file was sorted, and we knew it somehow, we could insert
534          * appropriate pathkeys into the ForeignPath node to tell the planner
535          * that.
536          */
537 }
538
539 /*
540  * fileGetForeignPlan
541  *              Create a ForeignScan plan node for scanning the foreign table
542  */
543 static ForeignScan *
544 fileGetForeignPlan(PlannerInfo *root,
545                                    RelOptInfo *baserel,
546                                    Oid foreigntableid,
547                                    ForeignPath *best_path,
548                                    List *tlist,
549                                    List *scan_clauses,
550                                    Plan *outer_plan)
551 {
552         Index           scan_relid = baserel->relid;
553
554         /*
555          * We have no native ability to evaluate restriction clauses, so we just
556          * put all the scan_clauses into the plan node's qual list for the
557          * executor to check.  So all we have to do here is strip RestrictInfo
558          * nodes from the clauses and ignore pseudoconstants (which will be
559          * handled elsewhere).
560          */
561         scan_clauses = extract_actual_clauses(scan_clauses, false);
562
563         /* Create the ForeignScan node */
564         return make_foreignscan(tlist,
565                                                         scan_clauses,
566                                                         scan_relid,
567                                                         NIL,    /* no expressions to evaluate */
568                                                         best_path->fdw_private,
569                                                         NIL,    /* no custom tlist */
570                                                         NIL,    /* no remote quals */
571                                                         outer_plan);
572 }
573
574 /*
575  * fileExplainForeignScan
576  *              Produce extra output for EXPLAIN
577  */
578 static void
579 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
580 {
581         char       *filename;
582         List       *options;
583
584         /* Fetch options --- we only need filename at this point */
585         fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
586                                    &filename, &options);
587
588         ExplainPropertyText("Foreign File", filename, es);
589
590         /* Suppress file size if we're not showing cost details */
591         if (es->costs)
592         {
593                 struct stat stat_buf;
594
595                 if (stat(filename, &stat_buf) == 0)
596                         ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
597                                                                 es);
598         }
599 }
600
601 /*
602  * fileBeginForeignScan
603  *              Initiate access to the file by creating CopyState
604  */
605 static void
606 fileBeginForeignScan(ForeignScanState *node, int eflags)
607 {
608         ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
609         char       *filename;
610         List       *options;
611         CopyState       cstate;
612         FileFdwExecutionState *festate;
613
614         /*
615          * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
616          */
617         if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
618                 return;
619
620         /* Fetch options of foreign table */
621         fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
622                                    &filename, &options);
623
624         /* Add any options from the plan (currently only convert_selectively) */
625         options = list_concat(options, plan->fdw_private);
626
627         /*
628          * Create CopyState from FDW options.  We always acquire all columns, so
629          * as to match the expected ScanTupleSlot signature.
630          */
631         cstate = BeginCopyFrom(node->ss.ss_currentRelation,
632                                                    filename,
633                                                    false,
634                                                    NIL,
635                                                    options);
636
637         /*
638          * Save state in node->fdw_state.  We must save enough information to call
639          * BeginCopyFrom() again.
640          */
641         festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
642         festate->filename = filename;
643         festate->options = options;
644         festate->cstate = cstate;
645
646         node->fdw_state = (void *) festate;
647 }
648
649 /*
650  * fileIterateForeignScan
651  *              Read next record from the data file and store it into the
652  *              ScanTupleSlot as a virtual tuple
653  */
654 static TupleTableSlot *
655 fileIterateForeignScan(ForeignScanState *node)
656 {
657         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
658         TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
659         bool            found;
660         ErrorContextCallback errcallback;
661
662         /* Set up callback to identify error line number. */
663         errcallback.callback = CopyFromErrorCallback;
664         errcallback.arg = (void *) festate->cstate;
665         errcallback.previous = error_context_stack;
666         error_context_stack = &errcallback;
667
668         /*
669          * The protocol for loading a virtual tuple into a slot is first
670          * ExecClearTuple, then fill the values/isnull arrays, then
671          * ExecStoreVirtualTuple.  If we don't find another row in the file, we
672          * just skip the last step, leaving the slot empty as required.
673          *
674          * We can pass ExprContext = NULL because we read all columns from the
675          * file, so no need to evaluate default expressions.
676          *
677          * We can also pass tupleOid = NULL because we don't allow oids for
678          * foreign tables.
679          */
680         ExecClearTuple(slot);
681         found = NextCopyFrom(festate->cstate, NULL,
682                                                  slot->tts_values, slot->tts_isnull,
683                                                  NULL);
684         if (found)
685                 ExecStoreVirtualTuple(slot);
686
687         /* Remove error callback. */
688         error_context_stack = errcallback.previous;
689
690         return slot;
691 }
692
693 /*
694  * fileReScanForeignScan
695  *              Rescan table, possibly with new parameters
696  */
697 static void
698 fileReScanForeignScan(ForeignScanState *node)
699 {
700         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
701
702         EndCopyFrom(festate->cstate);
703
704         festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
705                                                                         festate->filename,
706                                                                         false,
707                                                                         NIL,
708                                                                         festate->options);
709 }
710
711 /*
712  * fileEndForeignScan
713  *              Finish scanning foreign table and dispose objects used for this scan
714  */
715 static void
716 fileEndForeignScan(ForeignScanState *node)
717 {
718         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
719
720         /* if festate is NULL, we are in EXPLAIN; nothing to do */
721         if (festate)
722                 EndCopyFrom(festate->cstate);
723 }
724
725 /*
726  * fileAnalyzeForeignTable
727  *              Test whether analyzing this foreign table is supported
728  */
729 static bool
730 fileAnalyzeForeignTable(Relation relation,
731                                                 AcquireSampleRowsFunc *func,
732                                                 BlockNumber *totalpages)
733 {
734         char       *filename;
735         List       *options;
736         struct stat stat_buf;
737
738         /* Fetch options of foreign table */
739         fileGetOptions(RelationGetRelid(relation), &filename, &options);
740
741         /*
742          * Get size of the file.  (XXX if we fail here, would it be better to just
743          * return false to skip analyzing the table?)
744          */
745         if (stat(filename, &stat_buf) < 0)
746                 ereport(ERROR,
747                                 (errcode_for_file_access(),
748                                  errmsg("could not stat file \"%s\": %m",
749                                                 filename)));
750
751         /*
752          * Convert size to pages.  Must return at least 1 so that we can tell
753          * later on that pg_class.relpages is not default.
754          */
755         *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
756         if (*totalpages < 1)
757                 *totalpages = 1;
758
759         *func = file_acquire_sample_rows;
760
761         return true;
762 }
763
764 /*
765  * check_selective_binary_conversion
766  *
767  * Check to see if it's useful to convert only a subset of the file's columns
768  * to binary.  If so, construct a list of the column names to be converted,
769  * return that at *columns, and return TRUE.  (Note that it's possible to
770  * determine that no columns need be converted, for instance with a COUNT(*)
771  * query.  So we can't use returning a NIL list to indicate failure.)
772  */
773 static bool
774 check_selective_binary_conversion(RelOptInfo *baserel,
775                                                                   Oid foreigntableid,
776                                                                   List **columns)
777 {
778         ForeignTable *table;
779         ListCell   *lc;
780         Relation        rel;
781         TupleDesc       tupleDesc;
782         AttrNumber      attnum;
783         Bitmapset  *attrs_used = NULL;
784         bool            has_wholerow = false;
785         int                     numattrs;
786         int                     i;
787
788         *columns = NIL;                         /* default result */
789
790         /*
791          * Check format of the file.  If binary format, this is irrelevant.
792          */
793         table = GetForeignTable(foreigntableid);
794         foreach(lc, table->options)
795         {
796                 DefElem    *def = (DefElem *) lfirst(lc);
797
798                 if (strcmp(def->defname, "format") == 0)
799                 {
800                         char       *format = defGetString(def);
801
802                         if (strcmp(format, "binary") == 0)
803                                 return false;
804                         break;
805                 }
806         }
807
808         /* Collect all the attributes needed for joins or final output. */
809         pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
810                                    &attrs_used);
811
812         /* Add all the attributes used by restriction clauses. */
813         foreach(lc, baserel->baserestrictinfo)
814         {
815                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
816
817                 pull_varattnos((Node *) rinfo->clause, baserel->relid,
818                                            &attrs_used);
819         }
820
821         /* Convert attribute numbers to column names. */
822         rel = heap_open(foreigntableid, AccessShareLock);
823         tupleDesc = RelationGetDescr(rel);
824
825         while ((attnum = bms_first_member(attrs_used)) >= 0)
826         {
827                 /* Adjust for system attributes. */
828                 attnum += FirstLowInvalidHeapAttributeNumber;
829
830                 if (attnum == 0)
831                 {
832                         has_wholerow = true;
833                         break;
834                 }
835
836                 /* Ignore system attributes. */
837                 if (attnum < 0)
838                         continue;
839
840                 /* Get user attributes. */
841                 if (attnum > 0)
842                 {
843                         Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
844                         char       *attname = NameStr(attr->attname);
845
846                         /* Skip dropped attributes (probably shouldn't see any here). */
847                         if (attr->attisdropped)
848                                 continue;
849                         *columns = lappend(*columns, makeString(pstrdup(attname)));
850                 }
851         }
852
853         /* Count non-dropped user attributes while we have the tupdesc. */
854         numattrs = 0;
855         for (i = 0; i < tupleDesc->natts; i++)
856         {
857                 Form_pg_attribute attr = tupleDesc->attrs[i];
858
859                 if (attr->attisdropped)
860                         continue;
861                 numattrs++;
862         }
863
864         heap_close(rel, AccessShareLock);
865
866         /* If there's a whole-row reference, fail: we need all the columns. */
867         if (has_wholerow)
868         {
869                 *columns = NIL;
870                 return false;
871         }
872
873         /* If all the user attributes are needed, fail. */
874         if (numattrs == list_length(*columns))
875         {
876                 *columns = NIL;
877                 return false;
878         }
879
880         return true;
881 }
882
883 /*
884  * Estimate size of a foreign table.
885  *
886  * The main result is returned in baserel->rows.  We also set
887  * fdw_private->pages and fdw_private->ntuples for later use in the cost
888  * calculation.
889  */
890 static void
891 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
892                           FileFdwPlanState *fdw_private)
893 {
894         struct stat stat_buf;
895         BlockNumber pages;
896         double          ntuples;
897         double          nrows;
898
899         /*
900          * Get size of the file.  It might not be there at plan time, though, in
901          * which case we have to use a default estimate.
902          */
903         if (stat(fdw_private->filename, &stat_buf) < 0)
904                 stat_buf.st_size = 10 * BLCKSZ;
905
906         /*
907          * Convert size to pages for use in I/O cost estimate later.
908          */
909         pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
910         if (pages < 1)
911                 pages = 1;
912         fdw_private->pages = pages;
913
914         /*
915          * Estimate the number of tuples in the file.
916          */
917         if (baserel->pages > 0)
918         {
919                 /*
920                  * We have # of pages and # of tuples from pg_class (that is, from a
921                  * previous ANALYZE), so compute a tuples-per-page estimate and scale
922                  * that by the current file size.
923                  */
924                 double          density;
925
926                 density = baserel->tuples / (double) baserel->pages;
927                 ntuples = clamp_row_est(density * (double) pages);
928         }
929         else
930         {
931                 /*
932                  * Otherwise we have to fake it.  We back into this estimate using the
933                  * planner's idea of the relation width; which is bogus if not all
934                  * columns are being read, not to mention that the text representation
935                  * of a row probably isn't the same size as its internal
936                  * representation.  Possibly we could do something better, but the
937                  * real answer to anyone who complains is "ANALYZE" ...
938                  */
939                 int                     tuple_width;
940
941                 tuple_width = MAXALIGN(baserel->width) +
942                         MAXALIGN(SizeofHeapTupleHeader);
943                 ntuples = clamp_row_est((double) stat_buf.st_size /
944                                                                 (double) tuple_width);
945         }
946         fdw_private->ntuples = ntuples;
947
948         /*
949          * Now estimate the number of rows returned by the scan after applying the
950          * baserestrictinfo quals.
951          */
952         nrows = ntuples *
953                 clauselist_selectivity(root,
954                                                            baserel->baserestrictinfo,
955                                                            0,
956                                                            JOIN_INNER,
957                                                            NULL);
958
959         nrows = clamp_row_est(nrows);
960
961         /* Save the output-rows estimate for the planner */
962         baserel->rows = nrows;
963 }
964
965 /*
966  * Estimate costs of scanning a foreign table.
967  *
968  * Results are returned in *startup_cost and *total_cost.
969  */
970 static void
971 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
972                            FileFdwPlanState *fdw_private,
973                            Cost *startup_cost, Cost *total_cost)
974 {
975         BlockNumber pages = fdw_private->pages;
976         double          ntuples = fdw_private->ntuples;
977         Cost            run_cost = 0;
978         Cost            cpu_per_tuple;
979
980         /*
981          * We estimate costs almost the same way as cost_seqscan(), thus assuming
982          * that I/O costs are equivalent to a regular table file of the same size.
983          * However, we take per-tuple CPU costs as 10x of a seqscan, to account
984          * for the cost of parsing records.
985          */
986         run_cost += seq_page_cost * pages;
987
988         *startup_cost = baserel->baserestrictcost.startup;
989         cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
990         run_cost += cpu_per_tuple * ntuples;
991         *total_cost = *startup_cost + run_cost;
992 }
993
994 /*
995  * file_acquire_sample_rows -- acquire a random sample of rows from the table
996  *
997  * Selected rows are returned in the caller-allocated array rows[],
998  * which must have at least targrows entries.
999  * The actual number of rows selected is returned as the function result.
1000  * We also count the total number of rows in the file and return it into
1001  * *totalrows.  Note that *totaldeadrows is always set to 0.
1002  *
1003  * Note that the returned list of rows is not always in order by physical
1004  * position in the file.  Therefore, correlation estimates derived later
1005  * may be meaningless, but it's OK because we don't use the estimates
1006  * currently (the planner only pays attention to correlation for indexscans).
1007  */
1008 static int
1009 file_acquire_sample_rows(Relation onerel, int elevel,
1010                                                  HeapTuple *rows, int targrows,
1011                                                  double *totalrows, double *totaldeadrows)
1012 {
1013         int                     numrows = 0;
1014         double          rowstoskip = -1;        /* -1 means not set yet */
1015         ReservoirStateData rstate;
1016         TupleDesc       tupDesc;
1017         Datum      *values;
1018         bool       *nulls;
1019         bool            found;
1020         char       *filename;
1021         List       *options;
1022         CopyState       cstate;
1023         ErrorContextCallback errcallback;
1024         MemoryContext oldcontext = CurrentMemoryContext;
1025         MemoryContext tupcontext;
1026
1027         Assert(onerel);
1028         Assert(targrows > 0);
1029
1030         tupDesc = RelationGetDescr(onerel);
1031         values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1032         nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1033
1034         /* Fetch options of foreign table */
1035         fileGetOptions(RelationGetRelid(onerel), &filename, &options);
1036
1037         /*
1038          * Create CopyState from FDW options.
1039          */
1040         cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
1041
1042         /*
1043          * Use per-tuple memory context to prevent leak of memory used to read
1044          * rows from the file with Copy routines.
1045          */
1046         tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1047                                                                            "file_fdw temporary context",
1048                                                                            ALLOCSET_DEFAULT_MINSIZE,
1049                                                                            ALLOCSET_DEFAULT_INITSIZE,
1050                                                                            ALLOCSET_DEFAULT_MAXSIZE);
1051
1052         /* Prepare for sampling rows */
1053         reservoir_init_selection_state(&rstate, targrows);
1054
1055         /* Set up callback to identify error line number. */
1056         errcallback.callback = CopyFromErrorCallback;
1057         errcallback.arg = (void *) cstate;
1058         errcallback.previous = error_context_stack;
1059         error_context_stack = &errcallback;
1060
1061         *totalrows = 0;
1062         *totaldeadrows = 0;
1063         for (;;)
1064         {
1065                 /* Check for user-requested abort or sleep */
1066                 vacuum_delay_point();
1067
1068                 /* Fetch next row */
1069                 MemoryContextReset(tupcontext);
1070                 MemoryContextSwitchTo(tupcontext);
1071
1072                 found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
1073
1074                 MemoryContextSwitchTo(oldcontext);
1075
1076                 if (!found)
1077                         break;
1078
1079                 /*
1080                  * The first targrows sample rows are simply copied into the
1081                  * reservoir.  Then we start replacing tuples in the sample until we
1082                  * reach the end of the relation. This algorithm is from Jeff Vitter's
1083                  * paper (see more info in commands/analyze.c).
1084                  */
1085                 if (numrows < targrows)
1086                 {
1087                         rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1088                 }
1089                 else
1090                 {
1091                         /*
1092                          * t in Vitter's paper is the number of records already processed.
1093                          * If we need to compute a new S value, we must use the
1094                          * not-yet-incremented value of totalrows as t.
1095                          */
1096                         if (rowstoskip < 0)
1097                                 rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1098
1099                         if (rowstoskip <= 0)
1100                         {
1101                                 /*
1102                                  * Found a suitable tuple, so save it, replacing one old tuple
1103                                  * at random
1104                                  */
1105                                 int                     k = (int) (targrows * sampler_random_fract(rstate.randstate));
1106
1107                                 Assert(k >= 0 && k < targrows);
1108                                 heap_freetuple(rows[k]);
1109                                 rows[k] = heap_form_tuple(tupDesc, values, nulls);
1110                         }
1111
1112                         rowstoskip -= 1;
1113                 }
1114
1115                 *totalrows += 1;
1116         }
1117
1118         /* Remove error callback. */
1119         error_context_stack = errcallback.previous;
1120
1121         /* Clean up. */
1122         MemoryContextDelete(tupcontext);
1123
1124         EndCopyFrom(cstate);
1125
1126         pfree(values);
1127         pfree(nulls);
1128
1129         /*
1130          * Emit some interesting relation info
1131          */
1132         ereport(elevel,
1133                         (errmsg("\"%s\": file contains %.0f rows; "
1134                                         "%d rows in sample",
1135                                         RelationGetRelationName(onerel),
1136                                         *totalrows, numrows)));
1137
1138         return numrows;
1139 }