]> granicus.if.org Git - postgresql/blob - contrib/file_fdw/file_fdw.c
Add support for piping COPY to/from an external program.
[postgresql] / contrib / file_fdw / file_fdw.c
1 /*-------------------------------------------------------------------------
2  *
3  * file_fdw.c
4  *                foreign-data wrapper for server-side flat files.
5  *
6  * Copyright (c) 2010-2013, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *                contrib/file_fdw/file_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include <sys/stat.h>
16 #include <unistd.h>
17
18 #include "access/htup_details.h"
19 #include "access/reloptions.h"
20 #include "access/sysattr.h"
21 #include "catalog/pg_foreign_table.h"
22 #include "commands/copy.h"
23 #include "commands/defrem.h"
24 #include "commands/explain.h"
25 #include "commands/vacuum.h"
26 #include "foreign/fdwapi.h"
27 #include "foreign/foreign.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "optimizer/cost.h"
31 #include "optimizer/pathnode.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37
38 PG_MODULE_MAGIC;
39
40 /*
41  * Describes the valid options for objects that use this wrapper.
42  */
43 struct FileFdwOption
44 {
45         const char *optname;
46         Oid                     optcontext;             /* Oid of catalog in which option may appear */
47 };
48
49 /*
50  * Valid options for file_fdw.
51  * These options are based on the options for COPY FROM command.
52  * But note that force_not_null is handled as a boolean option attached to
53  * each column, not as a table option.
54  *
55  * Note: If you are adding new option for user mapping, you need to modify
56  * fileGetOptions(), which currently doesn't bother to look at user mappings.
57  */
58 static const struct FileFdwOption valid_options[] = {
59         /* File options */
60         {"filename", ForeignTableRelationId},
61
62         /* Format options */
63         /* oids option is not supported */
64         {"format", ForeignTableRelationId},
65         {"header", ForeignTableRelationId},
66         {"delimiter", ForeignTableRelationId},
67         {"quote", ForeignTableRelationId},
68         {"escape", ForeignTableRelationId},
69         {"null", ForeignTableRelationId},
70         {"encoding", ForeignTableRelationId},
71         {"force_not_null", AttributeRelationId},
72
73         /*
74          * force_quote is not supported by file_fdw because it's for COPY TO.
75          */
76
77         /* Sentinel */
78         {NULL, InvalidOid}
79 };
80
81 /*
82  * FDW-specific information for RelOptInfo.fdw_private.
83  */
84 typedef struct FileFdwPlanState
85 {
86         char       *filename;           /* file to read */
87         List       *options;            /* merged COPY options, excluding filename */
88         BlockNumber pages;                      /* estimate of file's physical size */
89         double          ntuples;                /* estimate of number of rows in file */
90 } FileFdwPlanState;
91
92 /*
93  * FDW-specific information for ForeignScanState.fdw_state.
94  */
95 typedef struct FileFdwExecutionState
96 {
97         char       *filename;           /* file to read */
98         List       *options;            /* merged COPY options, excluding filename */
99         CopyState       cstate;                 /* state of reading file */
100 } FileFdwExecutionState;
101
102 /*
103  * SQL functions
104  */
105 extern Datum file_fdw_handler(PG_FUNCTION_ARGS);
106 extern Datum file_fdw_validator(PG_FUNCTION_ARGS);
107
108 PG_FUNCTION_INFO_V1(file_fdw_handler);
109 PG_FUNCTION_INFO_V1(file_fdw_validator);
110
111 /*
112  * FDW callback routines
113  */
114 static void fileGetForeignRelSize(PlannerInfo *root,
115                                           RelOptInfo *baserel,
116                                           Oid foreigntableid);
117 static void fileGetForeignPaths(PlannerInfo *root,
118                                         RelOptInfo *baserel,
119                                         Oid foreigntableid);
120 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
121                                    RelOptInfo *baserel,
122                                    Oid foreigntableid,
123                                    ForeignPath *best_path,
124                                    List *tlist,
125                                    List *scan_clauses);
126 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
127 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
128 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
129 static void fileReScanForeignScan(ForeignScanState *node);
130 static void fileEndForeignScan(ForeignScanState *node);
131 static bool fileAnalyzeForeignTable(Relation relation,
132                                                 AcquireSampleRowsFunc *func,
133                                                 BlockNumber *totalpages);
134
135 /*
136  * Helper functions
137  */
138 static bool is_valid_option(const char *option, Oid context);
139 static void fileGetOptions(Oid foreigntableid,
140                            char **filename, List **other_options);
141 static List *get_file_fdw_attribute_options(Oid relid);
142 static bool check_selective_binary_conversion(RelOptInfo *baserel,
143                                                                                           Oid foreigntableid,
144                                                                                           List **columns);
145 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
146                           FileFdwPlanState *fdw_private);
147 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
148                            FileFdwPlanState *fdw_private,
149                            Cost *startup_cost, Cost *total_cost);
150 static int file_acquire_sample_rows(Relation onerel, int elevel,
151                                                  HeapTuple *rows, int targrows,
152                                                  double *totalrows, double *totaldeadrows);
153
154
155 /*
156  * Foreign-data wrapper handler function: return a struct with pointers
157  * to my callback routines.
158  */
159 Datum
160 file_fdw_handler(PG_FUNCTION_ARGS)
161 {
162         FdwRoutine *fdwroutine = makeNode(FdwRoutine);
163
164         fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
165         fdwroutine->GetForeignPaths = fileGetForeignPaths;
166         fdwroutine->GetForeignPlan = fileGetForeignPlan;
167         fdwroutine->ExplainForeignScan = fileExplainForeignScan;
168         fdwroutine->BeginForeignScan = fileBeginForeignScan;
169         fdwroutine->IterateForeignScan = fileIterateForeignScan;
170         fdwroutine->ReScanForeignScan = fileReScanForeignScan;
171         fdwroutine->EndForeignScan = fileEndForeignScan;
172         fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
173
174         PG_RETURN_POINTER(fdwroutine);
175 }
176
177 /*
178  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
179  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
180  *
181  * Raise an ERROR if the option or its value is considered invalid.
182  */
183 Datum
184 file_fdw_validator(PG_FUNCTION_ARGS)
185 {
186         List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
187         Oid                     catalog = PG_GETARG_OID(1);
188         char       *filename = NULL;
189         DefElem    *force_not_null = NULL;
190         List       *other_options = NIL;
191         ListCell   *cell;
192
193         /*
194          * Only superusers are allowed to set options of a file_fdw foreign table.
195          * This is because the filename is one of those options, and we don't want
196          * non-superusers to be able to determine which file gets read.
197          *
198          * Putting this sort of permissions check in a validator is a bit of a
199          * crock, but there doesn't seem to be any other place that can enforce
200          * the check more cleanly.
201          *
202          * Note that the valid_options[] array disallows setting filename at any
203          * options level other than foreign table --- otherwise there'd still be a
204          * security hole.
205          */
206         if (catalog == ForeignTableRelationId && !superuser())
207                 ereport(ERROR,
208                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
209                                  errmsg("only superuser can change options of a file_fdw foreign table")));
210
211         /*
212          * Check that only options supported by file_fdw, and allowed for the
213          * current object type, are given.
214          */
215         foreach(cell, options_list)
216         {
217                 DefElem    *def = (DefElem *) lfirst(cell);
218
219                 if (!is_valid_option(def->defname, catalog))
220                 {
221                         const struct FileFdwOption *opt;
222                         StringInfoData buf;
223
224                         /*
225                          * Unknown option specified, complain about it. Provide a hint
226                          * with list of valid options for the object.
227                          */
228                         initStringInfo(&buf);
229                         for (opt = valid_options; opt->optname; opt++)
230                         {
231                                 if (catalog == opt->optcontext)
232                                         appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
233                                                                          opt->optname);
234                         }
235
236                         ereport(ERROR,
237                                         (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
238                                          errmsg("invalid option \"%s\"", def->defname),
239                                          buf.len > 0
240                                          ? errhint("Valid options in this context are: %s",
241                                                            buf.data)
242                                   : errhint("There are no valid options in this context.")));
243                 }
244
245                 /*
246                  * Separate out filename and force_not_null, since ProcessCopyOptions
247                  * won't accept them.  (force_not_null only comes in a boolean
248                  * per-column flavor here.)
249                  */
250                 if (strcmp(def->defname, "filename") == 0)
251                 {
252                         if (filename)
253                                 ereport(ERROR,
254                                                 (errcode(ERRCODE_SYNTAX_ERROR),
255                                                  errmsg("conflicting or redundant options")));
256                         filename = defGetString(def);
257                 }
258                 else if (strcmp(def->defname, "force_not_null") == 0)
259                 {
260                         if (force_not_null)
261                                 ereport(ERROR,
262                                                 (errcode(ERRCODE_SYNTAX_ERROR),
263                                                  errmsg("conflicting or redundant options")));
264                         force_not_null = def;
265                         /* Don't care what the value is, as long as it's a legal boolean */
266                         (void) defGetBoolean(def);
267                 }
268                 else
269                         other_options = lappend(other_options, def);
270         }
271
272         /*
273          * Now apply the core COPY code's validation logic for more checks.
274          */
275         ProcessCopyOptions(NULL, true, other_options);
276
277         /*
278          * Filename option is required for file_fdw foreign tables.
279          */
280         if (catalog == ForeignTableRelationId && filename == NULL)
281                 ereport(ERROR,
282                                 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
283                                  errmsg("filename is required for file_fdw foreign tables")));
284
285         PG_RETURN_VOID();
286 }
287
288 /*
289  * Check if the provided option is one of the valid options.
290  * context is the Oid of the catalog holding the object the option is for.
291  */
292 static bool
293 is_valid_option(const char *option, Oid context)
294 {
295         const struct FileFdwOption *opt;
296
297         for (opt = valid_options; opt->optname; opt++)
298         {
299                 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
300                         return true;
301         }
302         return false;
303 }
304
305 /*
306  * Fetch the options for a file_fdw foreign table.
307  *
308  * We have to separate out "filename" from the other options because
309  * it must not appear in the options list passed to the core COPY code.
310  */
311 static void
312 fileGetOptions(Oid foreigntableid,
313                            char **filename, List **other_options)
314 {
315         ForeignTable *table;
316         ForeignServer *server;
317         ForeignDataWrapper *wrapper;
318         List       *options;
319         ListCell   *lc,
320                            *prev;
321
322         /*
323          * Extract options from FDW objects.  We ignore user mappings because
324          * file_fdw doesn't have any options that can be specified there.
325          *
326          * (XXX Actually, given the current contents of valid_options[], there's
327          * no point in examining anything except the foreign table's own options.
328          * Simplify?)
329          */
330         table = GetForeignTable(foreigntableid);
331         server = GetForeignServer(table->serverid);
332         wrapper = GetForeignDataWrapper(server->fdwid);
333
334         options = NIL;
335         options = list_concat(options, wrapper->options);
336         options = list_concat(options, server->options);
337         options = list_concat(options, table->options);
338         options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
339
340         /*
341          * Separate out the filename.
342          */
343         *filename = NULL;
344         prev = NULL;
345         foreach(lc, options)
346         {
347                 DefElem    *def = (DefElem *) lfirst(lc);
348
349                 if (strcmp(def->defname, "filename") == 0)
350                 {
351                         *filename = defGetString(def);
352                         options = list_delete_cell(options, lc, prev);
353                         break;
354                 }
355                 prev = lc;
356         }
357
358         /*
359          * The validator should have checked that a filename was included in the
360          * options, but check again, just in case.
361          */
362         if (*filename == NULL)
363                 elog(ERROR, "filename is required for file_fdw foreign tables");
364
365         *other_options = options;
366 }
367
368 /*
369  * Retrieve per-column generic options from pg_attribute and construct a list
370  * of DefElems representing them.
371  *
372  * At the moment we only have "force_not_null", which should be combined into
373  * a single DefElem listing all such columns, since that's what COPY expects.
374  */
375 static List *
376 get_file_fdw_attribute_options(Oid relid)
377 {
378         Relation        rel;
379         TupleDesc       tupleDesc;
380         AttrNumber      natts;
381         AttrNumber      attnum;
382         List       *fnncolumns = NIL;
383
384         rel = heap_open(relid, AccessShareLock);
385         tupleDesc = RelationGetDescr(rel);
386         natts = tupleDesc->natts;
387
388         /* Retrieve FDW options for all user-defined attributes. */
389         for (attnum = 1; attnum <= natts; attnum++)
390         {
391                 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
392                 List       *options;
393                 ListCell   *lc;
394
395                 /* Skip dropped attributes. */
396                 if (attr->attisdropped)
397                         continue;
398
399                 options = GetForeignColumnOptions(relid, attnum);
400                 foreach(lc, options)
401                 {
402                         DefElem    *def = (DefElem *) lfirst(lc);
403
404                         if (strcmp(def->defname, "force_not_null") == 0)
405                         {
406                                 if (defGetBoolean(def))
407                                 {
408                                         char       *attname = pstrdup(NameStr(attr->attname));
409
410                                         fnncolumns = lappend(fnncolumns, makeString(attname));
411                                 }
412                         }
413                         /* maybe in future handle other options here */
414                 }
415         }
416
417         heap_close(rel, AccessShareLock);
418
419         /* Return DefElem only when some column(s) have force_not_null */
420         if (fnncolumns != NIL)
421                 return list_make1(makeDefElem("force_not_null", (Node *) fnncolumns));
422         else
423                 return NIL;
424 }
425
426 /*
427  * fileGetForeignRelSize
428  *              Obtain relation size estimates for a foreign table
429  */
430 static void
431 fileGetForeignRelSize(PlannerInfo *root,
432                                           RelOptInfo *baserel,
433                                           Oid foreigntableid)
434 {
435         FileFdwPlanState *fdw_private;
436
437         /*
438          * Fetch options.  We only need filename at this point, but we might as
439          * well get everything and not need to re-fetch it later in planning.
440          */
441         fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
442         fileGetOptions(foreigntableid,
443                                    &fdw_private->filename, &fdw_private->options);
444         baserel->fdw_private = (void *) fdw_private;
445
446         /* Estimate relation size */
447         estimate_size(root, baserel, fdw_private);
448 }
449
450 /*
451  * fileGetForeignPaths
452  *              Create possible access paths for a scan on the foreign table
453  *
454  *              Currently we don't support any push-down feature, so there is only one
455  *              possible access path, which simply returns all records in the order in
456  *              the data file.
457  */
458 static void
459 fileGetForeignPaths(PlannerInfo *root,
460                                         RelOptInfo *baserel,
461                                         Oid foreigntableid)
462 {
463         FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
464         Cost            startup_cost;
465         Cost            total_cost;
466         List       *columns;
467         List       *coptions = NIL;
468
469         /* Decide whether to selectively perform binary conversion */
470         if (check_selective_binary_conversion(baserel,
471                                                                                   foreigntableid,
472                                                                                   &columns))
473                 coptions = list_make1(makeDefElem("convert_selectively",
474                                                                                   (Node *) columns));
475
476         /* Estimate costs */
477         estimate_costs(root, baserel, fdw_private,
478                                    &startup_cost, &total_cost);
479
480         /*
481          * Create a ForeignPath node and add it as only possible path.  We use the
482          * fdw_private list of the path to carry the convert_selectively option;
483          * it will be propagated into the fdw_private list of the Plan node.
484          */
485         add_path(baserel, (Path *)
486                          create_foreignscan_path(root, baserel,
487                                                                          baserel->rows,
488                                                                          startup_cost,
489                                                                          total_cost,
490                                                                          NIL,           /* no pathkeys */
491                                                                          NULL,          /* no outer rel either */
492                                                                          coptions));
493
494         /*
495          * If data file was sorted, and we knew it somehow, we could insert
496          * appropriate pathkeys into the ForeignPath node to tell the planner
497          * that.
498          */
499 }
500
501 /*
502  * fileGetForeignPlan
503  *              Create a ForeignScan plan node for scanning the foreign table
504  */
505 static ForeignScan *
506 fileGetForeignPlan(PlannerInfo *root,
507                                    RelOptInfo *baserel,
508                                    Oid foreigntableid,
509                                    ForeignPath *best_path,
510                                    List *tlist,
511                                    List *scan_clauses)
512 {
513         Index           scan_relid = baserel->relid;
514
515         /*
516          * We have no native ability to evaluate restriction clauses, so we just
517          * put all the scan_clauses into the plan node's qual list for the
518          * executor to check.  So all we have to do here is strip RestrictInfo
519          * nodes from the clauses and ignore pseudoconstants (which will be
520          * handled elsewhere).
521          */
522         scan_clauses = extract_actual_clauses(scan_clauses, false);
523
524         /* Create the ForeignScan node */
525         return make_foreignscan(tlist,
526                                                         scan_clauses,
527                                                         scan_relid,
528                                                         NIL,    /* no expressions to evaluate */
529                                                         best_path->fdw_private);
530 }
531
532 /*
533  * fileExplainForeignScan
534  *              Produce extra output for EXPLAIN
535  */
536 static void
537 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
538 {
539         char       *filename;
540         List       *options;
541
542         /* Fetch options --- we only need filename at this point */
543         fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
544                                    &filename, &options);
545
546         ExplainPropertyText("Foreign File", filename, es);
547
548         /* Suppress file size if we're not showing cost details */
549         if (es->costs)
550         {
551                 struct stat stat_buf;
552
553                 if (stat(filename, &stat_buf) == 0)
554                         ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
555                                                                 es);
556         }
557 }
558
559 /*
560  * fileBeginForeignScan
561  *              Initiate access to the file by creating CopyState
562  */
563 static void
564 fileBeginForeignScan(ForeignScanState *node, int eflags)
565 {
566         ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
567         char       *filename;
568         List       *options;
569         CopyState       cstate;
570         FileFdwExecutionState *festate;
571
572         /*
573          * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
574          */
575         if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
576                 return;
577
578         /* Fetch options of foreign table */
579         fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
580                                    &filename, &options);
581
582         /* Add any options from the plan (currently only convert_selectively) */
583         options = list_concat(options, plan->fdw_private);
584
585         /*
586          * Create CopyState from FDW options.  We always acquire all columns, so
587          * as to match the expected ScanTupleSlot signature.
588          */
589         cstate = BeginCopyFrom(node->ss.ss_currentRelation,
590                                                    filename,
591                                                    false,
592                                                    NIL,
593                                                    options);
594
595         /*
596          * Save state in node->fdw_state.  We must save enough information to call
597          * BeginCopyFrom() again.
598          */
599         festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
600         festate->filename = filename;
601         festate->options = options;
602         festate->cstate = cstate;
603
604         node->fdw_state = (void *) festate;
605 }
606
607 /*
608  * fileIterateForeignScan
609  *              Read next record from the data file and store it into the
610  *              ScanTupleSlot as a virtual tuple
611  */
612 static TupleTableSlot *
613 fileIterateForeignScan(ForeignScanState *node)
614 {
615         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
616         TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
617         bool            found;
618         ErrorContextCallback errcallback;
619
620         /* Set up callback to identify error line number. */
621         errcallback.callback = CopyFromErrorCallback;
622         errcallback.arg = (void *) festate->cstate;
623         errcallback.previous = error_context_stack;
624         error_context_stack = &errcallback;
625
626         /*
627          * The protocol for loading a virtual tuple into a slot is first
628          * ExecClearTuple, then fill the values/isnull arrays, then
629          * ExecStoreVirtualTuple.  If we don't find another row in the file, we
630          * just skip the last step, leaving the slot empty as required.
631          *
632          * We can pass ExprContext = NULL because we read all columns from the
633          * file, so no need to evaluate default expressions.
634          *
635          * We can also pass tupleOid = NULL because we don't allow oids for
636          * foreign tables.
637          */
638         ExecClearTuple(slot);
639         found = NextCopyFrom(festate->cstate, NULL,
640                                                  slot->tts_values, slot->tts_isnull,
641                                                  NULL);
642         if (found)
643                 ExecStoreVirtualTuple(slot);
644
645         /* Remove error callback. */
646         error_context_stack = errcallback.previous;
647
648         return slot;
649 }
650
651 /*
652  * fileReScanForeignScan
653  *              Rescan table, possibly with new parameters
654  */
655 static void
656 fileReScanForeignScan(ForeignScanState *node)
657 {
658         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
659
660         EndCopyFrom(festate->cstate);
661
662         festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
663                                                                         festate->filename,
664                                                                         false,
665                                                                         NIL,
666                                                                         festate->options);
667 }
668
669 /*
670  * fileEndForeignScan
671  *              Finish scanning foreign table and dispose objects used for this scan
672  */
673 static void
674 fileEndForeignScan(ForeignScanState *node)
675 {
676         FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
677
678         /* if festate is NULL, we are in EXPLAIN; nothing to do */
679         if (festate)
680                 EndCopyFrom(festate->cstate);
681 }
682
683 /*
684  * fileAnalyzeForeignTable
685  *              Test whether analyzing this foreign table is supported
686  */
687 static bool
688 fileAnalyzeForeignTable(Relation relation,
689                                                 AcquireSampleRowsFunc *func,
690                                                 BlockNumber *totalpages)
691 {
692         char       *filename;
693         List       *options;
694         struct stat stat_buf;
695
696         /* Fetch options of foreign table */
697         fileGetOptions(RelationGetRelid(relation), &filename, &options);
698
699         /*
700          * Get size of the file.  (XXX if we fail here, would it be better to just
701          * return false to skip analyzing the table?)
702          */
703         if (stat(filename, &stat_buf) < 0)
704                 ereport(ERROR,
705                                 (errcode_for_file_access(),
706                                  errmsg("could not stat file \"%s\": %m",
707                                                 filename)));
708
709         /*
710          * Convert size to pages.  Must return at least 1 so that we can tell
711          * later on that pg_class.relpages is not default.
712          */
713         *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
714         if (*totalpages < 1)
715                 *totalpages = 1;
716
717         *func = file_acquire_sample_rows;
718
719         return true;
720 }
721
722 /*
723  * check_selective_binary_conversion
724  *
725  * Check to see if it's useful to convert only a subset of the file's columns
726  * to binary.  If so, construct a list of the column names to be converted,
727  * return that at *columns, and return TRUE.  (Note that it's possible to
728  * determine that no columns need be converted, for instance with a COUNT(*)
729  * query.  So we can't use returning a NIL list to indicate failure.)
730  */
731 static bool
732 check_selective_binary_conversion(RelOptInfo *baserel,
733                                                                   Oid foreigntableid,
734                                                                   List **columns)
735 {
736         ForeignTable *table;
737         ListCell   *lc;
738         Relation        rel;
739         TupleDesc       tupleDesc;
740         AttrNumber      attnum;
741         Bitmapset  *attrs_used = NULL;
742         bool            has_wholerow = false;
743         int                     numattrs;
744         int                     i;
745
746         *columns = NIL;                         /* default result */
747
748         /*
749          * Check format of the file.  If binary format, this is irrelevant.
750          */
751         table = GetForeignTable(foreigntableid);
752         foreach(lc, table->options)
753         {
754                 DefElem    *def = (DefElem *) lfirst(lc);
755
756                 if (strcmp(def->defname, "format") == 0)
757                 {
758                         char       *format = defGetString(def);
759
760                         if (strcmp(format, "binary") == 0)
761                                 return false;
762                         break;
763                 }
764         }
765
766         /* Collect all the attributes needed for joins or final output. */
767         pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
768                                    &attrs_used);
769
770         /* Add all the attributes used by restriction clauses. */
771         foreach(lc, baserel->baserestrictinfo)
772         {
773                 RestrictInfo   *rinfo = (RestrictInfo *) lfirst(lc);
774
775                 pull_varattnos((Node *) rinfo->clause, baserel->relid,
776                                            &attrs_used);
777         }
778
779         /* Convert attribute numbers to column names. */
780         rel = heap_open(foreigntableid, AccessShareLock);
781         tupleDesc = RelationGetDescr(rel);
782
783         while ((attnum = bms_first_member(attrs_used)) >= 0)
784         {
785                 /* Adjust for system attributes. */
786                 attnum += FirstLowInvalidHeapAttributeNumber;
787
788                 if (attnum == 0)
789                 {
790                         has_wholerow = true;
791                         break;
792                 }
793
794                 /* Ignore system attributes. */
795                 if (attnum < 0)
796                         continue;
797
798                 /* Get user attributes. */
799                 if (attnum > 0)
800                 {
801                         Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
802                         char       *attname = NameStr(attr->attname);
803
804                         /* Skip dropped attributes (probably shouldn't see any here). */
805                         if (attr->attisdropped)
806                                 continue;
807                         *columns = lappend(*columns, makeString(pstrdup(attname)));
808                 }
809         }
810
811         /* Count non-dropped user attributes while we have the tupdesc. */
812         numattrs = 0;
813         for (i = 0; i < tupleDesc->natts; i++)
814         {
815                 Form_pg_attribute attr = tupleDesc->attrs[i];
816
817                 if (attr->attisdropped)
818                         continue;
819                 numattrs++;
820         }
821
822         heap_close(rel, AccessShareLock);
823
824         /* If there's a whole-row reference, fail: we need all the columns. */
825         if (has_wholerow)
826         {
827                 *columns = NIL;
828                 return false;
829         }
830
831         /* If all the user attributes are needed, fail. */
832         if (numattrs == list_length(*columns))
833         {
834                 *columns = NIL;
835                 return false;
836         }
837
838         return true;
839 }
840
841 /*
842  * Estimate size of a foreign table.
843  *
844  * The main result is returned in baserel->rows.  We also set
845  * fdw_private->pages and fdw_private->ntuples for later use in the cost
846  * calculation.
847  */
848 static void
849 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
850                           FileFdwPlanState *fdw_private)
851 {
852         struct stat stat_buf;
853         BlockNumber pages;
854         double          ntuples;
855         double          nrows;
856
857         /*
858          * Get size of the file.  It might not be there at plan time, though, in
859          * which case we have to use a default estimate.
860          */
861         if (stat(fdw_private->filename, &stat_buf) < 0)
862                 stat_buf.st_size = 10 * BLCKSZ;
863
864         /*
865          * Convert size to pages for use in I/O cost estimate later.
866          */
867         pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
868         if (pages < 1)
869                 pages = 1;
870         fdw_private->pages = pages;
871
872         /*
873          * Estimate the number of tuples in the file.
874          */
875         if (baserel->pages > 0)
876         {
877                 /*
878                  * We have # of pages and # of tuples from pg_class (that is, from a
879                  * previous ANALYZE), so compute a tuples-per-page estimate and scale
880                  * that by the current file size.
881                  */
882                 double          density;
883
884                 density = baserel->tuples / (double) baserel->pages;
885                 ntuples = clamp_row_est(density * (double) pages);
886         }
887         else
888         {
889                 /*
890                  * Otherwise we have to fake it.  We back into this estimate using the
891                  * planner's idea of the relation width; which is bogus if not all
892                  * columns are being read, not to mention that the text representation
893                  * of a row probably isn't the same size as its internal
894                  * representation.      Possibly we could do something better, but the
895                  * real answer to anyone who complains is "ANALYZE" ...
896                  */
897                 int                     tuple_width;
898
899                 tuple_width = MAXALIGN(baserel->width) +
900                         MAXALIGN(sizeof(HeapTupleHeaderData));
901                 ntuples = clamp_row_est((double) stat_buf.st_size /
902                                                                 (double) tuple_width);
903         }
904         fdw_private->ntuples = ntuples;
905
906         /*
907          * Now estimate the number of rows returned by the scan after applying the
908          * baserestrictinfo quals.
909          */
910         nrows = ntuples *
911                 clauselist_selectivity(root,
912                                                            baserel->baserestrictinfo,
913                                                            0,
914                                                            JOIN_INNER,
915                                                            NULL);
916
917         nrows = clamp_row_est(nrows);
918
919         /* Save the output-rows estimate for the planner */
920         baserel->rows = nrows;
921 }
922
923 /*
924  * Estimate costs of scanning a foreign table.
925  *
926  * Results are returned in *startup_cost and *total_cost.
927  */
928 static void
929 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
930                            FileFdwPlanState *fdw_private,
931                            Cost *startup_cost, Cost *total_cost)
932 {
933         BlockNumber pages = fdw_private->pages;
934         double          ntuples = fdw_private->ntuples;
935         Cost            run_cost = 0;
936         Cost            cpu_per_tuple;
937
938         /*
939          * We estimate costs almost the same way as cost_seqscan(), thus assuming
940          * that I/O costs are equivalent to a regular table file of the same size.
941          * However, we take per-tuple CPU costs as 10x of a seqscan, to account
942          * for the cost of parsing records.
943          */
944         run_cost += seq_page_cost * pages;
945
946         *startup_cost = baserel->baserestrictcost.startup;
947         cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
948         run_cost += cpu_per_tuple * ntuples;
949         *total_cost = *startup_cost + run_cost;
950 }
951
952 /*
953  * file_acquire_sample_rows -- acquire a random sample of rows from the table
954  *
955  * Selected rows are returned in the caller-allocated array rows[],
956  * which must have at least targrows entries.
957  * The actual number of rows selected is returned as the function result.
958  * We also count the total number of rows in the file and return it into
959  * *totalrows.  Note that *totaldeadrows is always set to 0.
960  *
961  * Note that the returned list of rows is not always in order by physical
962  * position in the file.  Therefore, correlation estimates derived later
963  * may be meaningless, but it's OK because we don't use the estimates
964  * currently (the planner only pays attention to correlation for indexscans).
965  */
966 static int
967 file_acquire_sample_rows(Relation onerel, int elevel,
968                                                  HeapTuple *rows, int targrows,
969                                                  double *totalrows, double *totaldeadrows)
970 {
971         int                     numrows = 0;
972         double          rowstoskip = -1;        /* -1 means not set yet */
973         double          rstate;
974         TupleDesc       tupDesc;
975         Datum      *values;
976         bool       *nulls;
977         bool            found;
978         char       *filename;
979         List       *options;
980         CopyState       cstate;
981         ErrorContextCallback errcallback;
982         MemoryContext oldcontext = CurrentMemoryContext;
983         MemoryContext tupcontext;
984
985         Assert(onerel);
986         Assert(targrows > 0);
987
988         tupDesc = RelationGetDescr(onerel);
989         values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
990         nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
991
992         /* Fetch options of foreign table */
993         fileGetOptions(RelationGetRelid(onerel), &filename, &options);
994
995         /*
996          * Create CopyState from FDW options.
997          */
998         cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
999
1000         /*
1001          * Use per-tuple memory context to prevent leak of memory used to read
1002          * rows from the file with Copy routines.
1003          */
1004         tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1005                                                                            "file_fdw temporary context",
1006                                                                            ALLOCSET_DEFAULT_MINSIZE,
1007                                                                            ALLOCSET_DEFAULT_INITSIZE,
1008                                                                            ALLOCSET_DEFAULT_MAXSIZE);
1009
1010         /* Prepare for sampling rows */
1011         rstate = anl_init_selection_state(targrows);
1012
1013         /* Set up callback to identify error line number. */
1014         errcallback.callback = CopyFromErrorCallback;
1015         errcallback.arg = (void *) cstate;
1016         errcallback.previous = error_context_stack;
1017         error_context_stack = &errcallback;
1018
1019         *totalrows = 0;
1020         *totaldeadrows = 0;
1021         for (;;)
1022         {
1023                 /* Check for user-requested abort or sleep */
1024                 vacuum_delay_point();
1025
1026                 /* Fetch next row */
1027                 MemoryContextReset(tupcontext);
1028                 MemoryContextSwitchTo(tupcontext);
1029
1030                 found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
1031
1032                 MemoryContextSwitchTo(oldcontext);
1033
1034                 if (!found)
1035                         break;
1036
1037                 /*
1038                  * The first targrows sample rows are simply copied into the
1039                  * reservoir.  Then we start replacing tuples in the sample until we
1040                  * reach the end of the relation. This algorithm is from Jeff Vitter's
1041                  * paper (see more info in commands/analyze.c).
1042                  */
1043                 if (numrows < targrows)
1044                 {
1045                         rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1046                 }
1047                 else
1048                 {
1049                         /*
1050                          * t in Vitter's paper is the number of records already processed.
1051                          * If we need to compute a new S value, we must use the
1052                          * not-yet-incremented value of totalrows as t.
1053                          */
1054                         if (rowstoskip < 0)
1055                                 rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate);
1056
1057                         if (rowstoskip <= 0)
1058                         {
1059                                 /*
1060                                  * Found a suitable tuple, so save it, replacing one old tuple
1061                                  * at random
1062                                  */
1063                                 int                     k = (int) (targrows * anl_random_fract());
1064
1065                                 Assert(k >= 0 && k < targrows);
1066                                 heap_freetuple(rows[k]);
1067                                 rows[k] = heap_form_tuple(tupDesc, values, nulls);
1068                         }
1069
1070                         rowstoskip -= 1;
1071                 }
1072
1073                 *totalrows += 1;
1074         }
1075
1076         /* Remove error callback. */
1077         error_context_stack = errcallback.previous;
1078
1079         /* Clean up. */
1080         MemoryContextDelete(tupcontext);
1081
1082         EndCopyFrom(cstate);
1083
1084         pfree(values);
1085         pfree(nulls);
1086
1087         /*
1088          * Emit some interesting relation info
1089          */
1090         ereport(elevel,
1091                         (errmsg("\"%s\": file contains %.0f rows; "
1092                                         "%d rows in sample",
1093                                         RelationGetRelationName(onerel),
1094                                         *totalrows, numrows)));
1095
1096         return numrows;
1097 }