]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/copy.c
Fix reporting of violations in ExecConstraints, again.
[postgresql] / src / backend / commands / copy.c
index 28dcd340017a1fa53f3db76dd170f9014f738ce2..73677be59e03dc2d79806b07125e176a7671ec4e 100644 (file)
@@ -3,7 +3,7 @@
  * copy.c
  *             Implements the COPY utility command
  *
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
@@ -60,7 +60,8 @@ typedef enum CopyDest
 {
        COPY_FILE,                                      /* to/from file (or a piped program) */
        COPY_OLD_FE,                            /* to/from frontend (2.0 protocol) */
-       COPY_NEW_FE                                     /* to/from frontend (3.0 protocol) */
+       COPY_NEW_FE,                            /* to/from frontend (3.0 protocol) */
+       COPY_CALLBACK                           /* to/from callback function */
 } CopyDest;
 
 /*
@@ -109,6 +110,7 @@ typedef struct CopyStateData
        List       *attnumlist;         /* integer list of attnums to copy */
        char       *filename;           /* filename, or NULL for STDIN/STDOUT */
        bool            is_program;             /* is 'filename' a program to popen? */
+       copy_data_source_cb     data_source_cb;         /* function for reading data*/
        bool            binary;                 /* binary format? */
        bool            oids;                   /* include OIDs? */
        bool            freeze;                 /* freeze rows on loading? */
@@ -162,6 +164,13 @@ typedef struct CopyStateData
        bool            volatile_defexprs;              /* is any of defexprs volatile? */
        List       *range_table;
 
+       PartitionDispatch *partition_dispatch_info;
+       int                     num_dispatch;   /* Number of entries in the above array */
+       int                     num_partitions; /* Number of members in the following arrays */
+       ResultRelInfo *partitions;      /* Per partition result relation */
+       TupleConversionMap **partition_tupconv_maps;
+       TupleTableSlot *partition_tuple_slot;
+
        /*
         * These variables are used to reduce overhead in textual COPY FROM.
         *
@@ -279,20 +288,19 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
-                 const char *queryString, const Oid queryRelId, List *attnamelist,
+static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
+                 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
                  List *options);
 static void EndCopy(CopyState cstate);
 static void ClosePipeToProgram(CopyState cstate);
-static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
-                       const Oid queryRelId, const char *filename, bool is_program,
+static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+                       Oid queryRelId, const char *filename, bool is_program,
                        List *attnamelist, List *options);
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
                         Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
                                        CommandId mycid, int hi_options,
                                        ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
@@ -353,7 +361,7 @@ SendCopyBegin(CopyState cstate)
                pq_endmessage(&buf);
                cstate->copy_dest = COPY_NEW_FE;
        }
-       else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+       else
        {
                /* old way */
                if (cstate->binary)
@@ -365,18 +373,6 @@ SendCopyBegin(CopyState cstate)
                pq_startcopyout();
                cstate->copy_dest = COPY_OLD_FE;
        }
-       else
-       {
-               /* very old way */
-               if (cstate->binary)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                       errmsg("COPY BINARY is not supported to stdout or from stdin")));
-               pq_putemptymessage('B');
-               /* grottiness needed for old COPY OUT protocol */
-               pq_startcopyout();
-               cstate->copy_dest = COPY_OLD_FE;
-       }
 }
 
 static void
@@ -397,9 +393,9 @@ ReceiveCopyBegin(CopyState cstate)
                        pq_sendint(&buf, format, 2);            /* per-column formats */
                pq_endmessage(&buf);
                cstate->copy_dest = COPY_NEW_FE;
-               cstate->fe_msgbuf = makeStringInfo();
+               cstate->fe_msgbuf = makeLongStringInfo();
        }
-       else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+       else
        {
                /* old way */
                if (cstate->binary)
@@ -411,18 +407,6 @@ ReceiveCopyBegin(CopyState cstate)
                pq_startmsgread();
                cstate->copy_dest = COPY_OLD_FE;
        }
-       else
-       {
-               /* very old way */
-               if (cstate->binary)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                       errmsg("COPY BINARY is not supported to stdout or from stdin")));
-               pq_putemptymessage('D');
-               /* any error in old protocol will make us lose sync */
-               pq_startmsgread();
-               cstate->copy_dest = COPY_OLD_FE;
-       }
        /* We *must* flush here to ensure FE knows it can send. */
        pq_flush();
 }
@@ -546,6 +530,9 @@ CopySendEndOfRow(CopyState cstate)
                        /* Dump the accumulated row as one CopyData message */
                        (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
                        break;
+               case COPY_CALLBACK:
+                       Assert(false); /* Not yet supported. */
+                       break;
        }
 
        resetStringInfo(fe_msgbuf);
@@ -660,6 +647,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                bytesread += avail;
                        }
                        break;
+               case COPY_CALLBACK:
+                       bytesread = cstate->data_source_cb(databuf, minread, maxread);
+                       break;
        }
 
        return bytesread;
@@ -786,15 +776,17 @@ CopyLoadRawBuf(CopyState cstate)
  * Do not allow the copy if user doesn't have proper permission to access
  * the table or the specifically requested columns.
  */
-Oid
-DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
+void
+DoCopy(ParseState *pstate, const CopyStmt *stmt,
+          int stmt_location, int stmt_len,
+          uint64 *processed)
 {
        CopyState       cstate;
        bool            is_from = stmt->is_from;
        bool            pipe = (stmt->filename == NULL);
        Relation        rel;
        Oid                     relid;
-       Node       *query = NULL;
+       RawStmt    *query = NULL;
        List       *range_table = NIL;
 
        /* Disallow COPY to/from file or program except to superusers. */
@@ -871,42 +863,85 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
                        ColumnRef  *cr;
                        ResTarget  *target;
                        RangeVar   *from;
+                       List       *targetList = NIL;
 
                        if (is_from)
                                ereport(ERROR,
                                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                 errmsg("COPY FROM not supported with row-level security"),
+                                  errmsg("COPY FROM not supported with row-level security"),
                                                 errhint("Use INSERT statements instead.")));
 
-                       /* Build target list */
-                       cr = makeNode(ColumnRef);
-
+                       /*
+                        * Build target list
+                        *
+                        * If no columns are specified in the attribute list of the COPY
+                        * command, then the target list is 'all' columns. Therefore, '*'
+                        * should be used as the target list for the resulting SELECT
+                        * statement.
+                        *
+                        * In the case that columns are specified in the attribute list,
+                        * create a ColumnRef and ResTarget for each column and add them
+                        * to the target list for the resulting SELECT statement.
+                        */
                        if (!stmt->attlist)
+                       {
+                               cr = makeNode(ColumnRef);
                                cr->fields = list_make1(makeNode(A_Star));
-                       else
-                               cr->fields = stmt->attlist;
+                               cr->location = -1;
+
+                               target = makeNode(ResTarget);
+                               target->name = NULL;
+                               target->indirection = NIL;
+                               target->val = (Node *) cr;
+                               target->location = -1;
 
-                       cr->location = 1;
+                               targetList = list_make1(target);
+                       }
+                       else
+                       {
+                               ListCell   *lc;
 
-                       target = makeNode(ResTarget);
-                       target->name = NULL;
-                       target->indirection = NIL;
-                       target->val = (Node *) cr;
-                       target->location = 1;
+                               foreach(lc, stmt->attlist)
+                               {
+                                       /*
+                                        * Build the ColumnRef for each column.  The ColumnRef
+                                        * 'fields' property is a String 'Value' node (see
+                                        * nodes/value.h) that corresponds to the column name
+                                        * respectively.
+                                        */
+                                       cr = makeNode(ColumnRef);
+                                       cr->fields = list_make1(lfirst(lc));
+                                       cr->location = -1;
+
+                                       /* Build the ResTarget and add the ColumnRef to it. */
+                                       target = makeNode(ResTarget);
+                                       target->name = NULL;
+                                       target->indirection = NIL;
+                                       target->val = (Node *) cr;
+                                       target->location = -1;
+
+                                       /* Add each column to the SELECT statement's target list */
+                                       targetList = lappend(targetList, target);
+                               }
+                       }
 
                        /*
                         * Build RangeVar for from clause, fully qualified based on the
                         * relation which we have opened and locked.
                         */
                        from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
-                                                               RelationGetRelationName(rel), -1);
+                                                               pstrdup(RelationGetRelationName(rel)),
+                                                               -1);
 
                        /* Build query */
                        select = makeNode(SelectStmt);
-                       select->targetList = list_make1(target);
+                       select->targetList = targetList;
                        select->fromClause = list_make1(from);
 
-                       query = (Node *) select;
+                       query = makeNode(RawStmt);
+                       query->stmt = (Node *) select;
+                       query->stmt_location = stmt_location;
+                       query->stmt_len = stmt_len;
 
                        /*
                         * Close the relation for now, but keep the lock on it to prevent
@@ -922,7 +957,11 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
        {
                Assert(stmt->query);
 
-               query = stmt->query;
+               query = makeNode(RawStmt);
+               query->stmt = stmt->query;
+               query->stmt_location = stmt_location;
+               query->stmt_len = stmt_len;
+
                relid = InvalidOid;
                rel = NULL;
        }
@@ -936,15 +975,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
                        PreventCommandIfReadOnly("COPY FROM");
                PreventCommandIfParallelMode("COPY FROM");
 
-               cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
-                                                          stmt->attlist, stmt->options);
+               cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
+                                                          NULL, stmt->attlist, stmt->options);
                cstate->range_table = range_table;
                *processed = CopyFrom(cstate);  /* copy from file to database */
                EndCopyFrom(cstate);
        }
        else
        {
-               cstate = BeginCopyTo(rel, query, queryString, relid,
+               cstate = BeginCopyTo(pstate, rel, query, relid,
                                                         stmt->filename, stmt->is_program,
                                                         stmt->attlist, stmt->options);
                *processed = DoCopyTo(cstate);  /* copy from database to file */
@@ -958,8 +997,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
         */
        if (rel != NULL)
                heap_close(rel, (is_from ? NoLock : AccessShareLock));
-
-       return relid;
 }
 
 /*
@@ -980,7 +1017,8 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
  * self-consistency of the options list.
  */
 void
-ProcessCopyOptions(CopyState cstate,
+ProcessCopyOptions(ParseState *pstate,
+                                  CopyState cstate,
                                   bool is_from,
                                   List *options)
 {
@@ -996,7 +1034,7 @@ ProcessCopyOptions(CopyState cstate,
        /* Extract options from the statement node tree */
        foreach(option, options)
        {
-               DefElem    *defel = (DefElem *) lfirst(option);
+               DefElem    *defel = castNode(DefElem, lfirst(option));
 
                if (strcmp(defel->defname, "format") == 0)
                {
@@ -1005,7 +1043,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (format_specified)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        format_specified = true;
                        if (strcmp(fmt, "text") == 0)
                                 /* default format */ ;
@@ -1016,14 +1055,16 @@ ProcessCopyOptions(CopyState cstate,
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                                errmsg("COPY format \"%s\" not recognized", fmt)));
+                                                errmsg("COPY format \"%s\" not recognized", fmt),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "oids") == 0)
                {
                        if (cstate->oids)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->oids = defGetBoolean(defel);
                }
                else if (strcmp(defel->defname, "freeze") == 0)
@@ -1031,7 +1072,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->freeze)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->freeze = defGetBoolean(defel);
                }
                else if (strcmp(defel->defname, "delimiter") == 0)
@@ -1039,7 +1081,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->delim)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->delim = defGetString(defel);
                }
                else if (strcmp(defel->defname, "null") == 0)
@@ -1047,7 +1090,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->null_print)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->null_print = defGetString(defel);
                }
                else if (strcmp(defel->defname, "header") == 0)
@@ -1055,7 +1099,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->header_line)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->header_line = defGetBoolean(defel);
                }
                else if (strcmp(defel->defname, "quote") == 0)
@@ -1063,7 +1108,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->quote)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->quote = defGetString(defel);
                }
                else if (strcmp(defel->defname, "escape") == 0)
@@ -1071,7 +1117,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->escape)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->escape = defGetString(defel);
                }
                else if (strcmp(defel->defname, "force_quote") == 0)
@@ -1079,30 +1126,34 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->force_quote || cstate->force_quote_all)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        if (defel->arg && IsA(defel->arg, A_Star))
                                cstate->force_quote_all = true;
                        else if (defel->arg && IsA(defel->arg, List))
-                               cstate->force_quote = (List *) defel->arg;
+                               cstate->force_quote = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a list of column names",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "force_not_null") == 0)
                {
                        if (cstate->force_notnull)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        if (defel->arg && IsA(defel->arg, List))
-                               cstate->force_notnull = (List *) defel->arg;
+                               cstate->force_notnull = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a list of column names",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "force_null") == 0)
                {
@@ -1111,12 +1162,13 @@ ProcessCopyOptions(CopyState cstate,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
                        if (defel->arg && IsA(defel->arg, List))
-                               cstate->force_null = (List *) defel->arg;
+                               cstate->force_null = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a list of column names",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "convert_selectively") == 0)
                {
@@ -1128,34 +1180,39 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->convert_selectively)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->convert_selectively = true;
                        if (defel->arg == NULL || IsA(defel->arg, List))
-                               cstate->convert_select = (List *) defel->arg;
+                               cstate->convert_select = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a list of column names",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "encoding") == 0)
                {
                        if (cstate->file_encoding >= 0)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
                        if (cstate->file_encoding < 0)
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a valid encoding name",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                                         errmsg("option \"%s\" not recognized",
-                                                       defel->defname)));
+                                                       defel->defname),
+                                        parser_errposition(pstate, defel->location)));
        }
 
        /*
@@ -1318,11 +1375,11 @@ ProcessCopyOptions(CopyState cstate,
  * NULL values as <null_print>.
  */
 static CopyState
-BeginCopy(bool is_from,
+BeginCopy(ParseState *pstate,
+                 bool is_from,
                  Relation rel,
-                 Node *raw_query,
-                 const char *queryString,
-                 const Oid queryRelId,
+                 RawStmt *raw_query,
+                 Oid queryRelId,
                  List *attnamelist,
                  List *options)
 {
@@ -1340,14 +1397,12 @@ BeginCopy(bool is_from,
         */
        cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
                                                                                                "COPY",
-                                                                                               ALLOCSET_DEFAULT_MINSIZE,
-                                                                                               ALLOCSET_DEFAULT_INITSIZE,
-                                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                               ALLOCSET_DEFAULT_SIZES);
 
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
        /* Extract options from the statement node tree */
-       ProcessCopyOptions(cstate, is_from, options);
+       ProcessCopyOptions(pstate, cstate, is_from, options);
 
        /* Process the source/target relation or query */
        if (rel)
@@ -1364,6 +1419,30 @@ BeginCopy(bool is_from,
                                        (errcode(ERRCODE_UNDEFINED_COLUMN),
                                         errmsg("table \"%s\" does not have OIDs",
                                                        RelationGetRelationName(cstate->rel))));
+
+               /* Initialize state for CopyFrom tuple routing. */
+               if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               {
+                       PartitionDispatch *partition_dispatch_info;
+                       ResultRelInfo *partitions;
+                       TupleConversionMap **partition_tupconv_maps;
+                       TupleTableSlot *partition_tuple_slot;
+                       int                     num_parted,
+                                               num_partitions;
+
+                       ExecSetupPartitionTupleRouting(rel,
+                                                                                  &partition_dispatch_info,
+                                                                                  &partitions,
+                                                                                  &partition_tupconv_maps,
+                                                                                  &partition_tuple_slot,
+                                                                                  &num_parted, &num_partitions);
+                       cstate->partition_dispatch_info = partition_dispatch_info;
+                       cstate->num_dispatch = num_parted;
+                       cstate->partitions = partitions;
+                       cstate->num_partitions = num_partitions;
+                       cstate->partition_tupconv_maps = partition_tupconv_maps;
+                       cstate->partition_tuple_slot = partition_tuple_slot;
+               }
        }
        else
        {
@@ -1391,24 +1470,25 @@ BeginCopy(bool is_from,
                 * function and is executed repeatedly.  (See also the same hack in
                 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
                 */
-               rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
-                                                                                  queryString, NULL, 0);
+               rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+                                                                                  pstate->p_sourcetext, NULL, 0,
+                                                                                  NULL);
 
                /* check that we got back something we can work with */
                if (rewritten == NIL)
                {
                        ereport(ERROR,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                       errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
+                        errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
                }
                else if (list_length(rewritten) > 1)
                {
-                       ListCell *lc;
+                       ListCell   *lc;
 
                        /* examine queries to determine which error message to issue */
                        foreach(lc, rewritten)
                        {
-                               Query     *q = (Query *) lfirst(lc);
+                               Query      *q = castNode(Query, lfirst(lc));
 
                                if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
                                        ereport(ERROR,
@@ -1417,7 +1497,7 @@ BeginCopy(bool is_from,
                                if (q->querySource == QSRC_NON_INSTEAD_RULE)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                                        errmsg("DO ALSO rules are not supported for the COPY")));
+                                       errmsg("DO ALSO rules are not supported for the COPY")));
                        }
 
                        ereport(ERROR,
@@ -1425,7 +1505,7 @@ BeginCopy(bool is_from,
                                         errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
                }
 
-               query = (Query *) linitial(rewritten);
+               query = castNode(Query, linitial(rewritten));
 
                /* The grammar allows SELECT INTO, but we don't support that */
                if (query->utilityStmt != NULL &&
@@ -1448,8 +1528,8 @@ BeginCopy(bool is_from,
                                   query->commandType == CMD_DELETE);
 
                        ereport(ERROR,
-                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                                errmsg("COPY query must have a RETURNING clause")));
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("COPY query must have a RETURNING clause")));
                }
 
                /* plan the query */
@@ -1492,10 +1572,10 @@ BeginCopy(bool is_from,
                ((DR_copy *) dest)->cstate = cstate;
 
                /* Create a QueryDesc requesting no output */
-               cstate->queryDesc = CreateQueryDesc(plan, queryString,
+               cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
                                                                                        GetActiveSnapshot(),
                                                                                        InvalidSnapshot,
-                                                                                       dest, NULL, 0);
+                                                                                       dest, NULL, NULL, 0);
 
                /*
                 * Call ExecutorStart to prepare the plan for execution.
@@ -1680,10 +1760,10 @@ EndCopy(CopyState cstate)
  * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
 static CopyState
-BeginCopyTo(Relation rel,
-                       Node *query,
-                       const char *queryString,
-                       const Oid queryRelId,
+BeginCopyTo(ParseState *pstate,
+                       Relation rel,
+                       RawStmt *query,
+                       Oid queryRelId,
                        const char *filename,
                        bool is_program,
                        List *attnamelist,
@@ -1718,6 +1798,12 @@ BeginCopyTo(Relation rel,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("cannot copy from sequence \"%s\"",
                                                        RelationGetRelationName(rel))));
+               else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from partitioned table \"%s\"",
+                                                       RelationGetRelationName(rel)),
+                                        errhint("Try the COPY (SELECT ...) TO variant.")));
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -1725,7 +1811,7 @@ BeginCopyTo(Relation rel,
                                                        RelationGetRelationName(rel))));
        }
 
-       cstate = BeginCopy(false, rel, query, queryString, queryRelId, attnamelist,
+       cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
                                           options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
@@ -1767,10 +1853,18 @@ BeginCopyTo(Relation rel,
                        cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
                        umask(oumask);
                        if (cstate->copy_file == NULL)
+                       {
+                               /* copy errno because ereport subfunctions might change it */
+                               int                     save_errno = errno;
+
                                ereport(ERROR,
                                                (errcode_for_file_access(),
                                                 errmsg("could not open file \"%s\" for writing: %m",
-                                                               cstate->filename)));
+                                                               cstate->filename),
+                                                (save_errno == ENOENT || save_errno == EACCES) ?
+                                                errhint("COPY TO instructs the PostgreSQL server process to write a file. "
+                                                                "You may want a client-side facility such as psql's \\copy.") : 0));
+                       }
 
                        if (fstat(fileno(cstate->copy_file), &st))
                                ereport(ERROR,
@@ -1866,7 +1960,7 @@ CopyTo(CopyState cstate)
        cstate->null_print_client = cstate->null_print;         /* default */
 
        /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
-       cstate->fe_msgbuf = makeStringInfo();
+       cstate->fe_msgbuf = makeLongStringInfo();
 
        /* Get info about the columns we need to process. */
        cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
@@ -1895,9 +1989,7 @@ CopyTo(CopyState cstate)
         */
        cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
                                                                                           "COPY TO",
-                                                                                          ALLOCSET_DEFAULT_MINSIZE,
-                                                                                          ALLOCSET_DEFAULT_INITSIZE,
-                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                          ALLOCSET_DEFAULT_SIZES);
 
        if (cstate->binary)
        {
@@ -1983,7 +2075,7 @@ CopyTo(CopyState cstate)
        else
        {
                /* run the plan --- the dest receiver will send tuples */
-               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
                processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
        }
 
@@ -2202,7 +2294,7 @@ limit_printout_length(const char *str)
 /*
  * Copy FROM file to relation.
  */
-static uint64
+uint64
 CopyFrom(CopyState cstate)
 {
        HeapTuple       tuple;
@@ -2210,6 +2302,7 @@ CopyFrom(CopyState cstate)
        Datum      *values;
        bool       *nulls;
        ResultRelInfo *resultRelInfo;
+       ResultRelInfo *saved_resultRelInfo = NULL;
        EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
        ExprContext *econtext;
        TupleTableSlot *myslot;
@@ -2222,6 +2315,7 @@ CopyFrom(CopyState cstate)
        uint64          processed = 0;
        bool            useHeapMultiInsert;
        int                     nBufferedTuples = 0;
+       int                     prev_leaf_part_index = -1;
 
 #define MAX_BUFFERED_TUPLES 1000
        HeapTuple  *bufferedTuples = NULL;      /* initialize to silence warning */
@@ -2230,13 +2324,22 @@ CopyFrom(CopyState cstate)
 
        Assert(cstate->rel);
 
-       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
+       /*
+        * The target must be a plain relation or have an INSTEAD OF INSERT row
+        * trigger.  (Currently, such triggers are only allowed on views, so we
+        * only hint about them in the view case.)
+        */
+       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+               cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+               !(cstate->rel->trigdesc &&
+                 cstate->rel->trigdesc->trig_insert_instead_row))
        {
                if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("cannot copy to view \"%s\"",
-                                                       RelationGetRelationName(cstate->rel))));
+                                                       RelationGetRelationName(cstate->rel)),
+                                        errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
                else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -2311,7 +2414,7 @@ CopyFrom(CopyState cstate)
         * earlier scan or command. This ensures that if this subtransaction
         * aborts then the frozen rows won't be visible after xact cleanup. Note
         * that the stronger test of exactly which subtransaction created it is
-        * crucial for correctness of this optimisation.
+        * crucial for correctness of this optimization.
         */
        if (cstate->freeze)
        {
@@ -2338,6 +2441,7 @@ CopyFrom(CopyState cstate)
        InitResultRelInfo(resultRelInfo,
                                          cstate->rel,
                                          1,            /* dummy rangetable index */
+                                         NULL,
                                          0);
 
        ExecOpenIndices(resultRelInfo, false);
@@ -2360,11 +2464,13 @@ CopyFrom(CopyState cstate)
         * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
         * expressions. Such triggers or expressions might query the table we're
         * inserting to, and act differently if the tuples that have already been
-        * processed and prepared for insertion are not there.
+        * processed and prepared for insertion are not there.  We also can't do
+        * it if the table is partitioned.
         */
        if ((resultRelInfo->ri_TrigDesc != NULL &&
                 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
                  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+               cstate->partition_dispatch_info != NULL ||
                cstate->volatile_defexprs)
        {
                useHeapMultiInsert = false;
@@ -2441,6 +2547,81 @@ CopyFrom(CopyState cstate)
                slot = myslot;
                ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
+               /* Determine the partition to heap_insert the tuple into */
+               if (cstate->partition_dispatch_info)
+               {
+                       int                     leaf_part_index;
+                       TupleConversionMap *map;
+
+                       /*
+                        * Away we go ... If we end up not finding a partition after all,
+                        * ExecFindPartition() does not return and errors out instead.
+                        * Otherwise, the returned value is to be used as an index into
+                        * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+                        * will get us the ResultRelInfo and TupleConversionMap for the
+                        * partition, respectively.
+                        */
+                       leaf_part_index = ExecFindPartition(resultRelInfo,
+                                                                                        cstate->partition_dispatch_info,
+                                                                                               slot,
+                                                                                               estate);
+                       Assert(leaf_part_index >= 0 &&
+                                  leaf_part_index < cstate->num_partitions);
+
+                       /*
+                        * If this tuple is mapped to a partition that is not same as the
+                        * previous one, we'd better make the bulk insert mechanism gets a
+                        * new buffer.
+                        */
+                       if (prev_leaf_part_index != leaf_part_index)
+                       {
+                               ReleaseBulkInsertStatePin(bistate);
+                               prev_leaf_part_index = leaf_part_index;
+                       }
+
+                       /*
+                        * Save the old ResultRelInfo and switch to the one corresponding
+                        * to the selected partition.
+                        */
+                       saved_resultRelInfo = resultRelInfo;
+                       resultRelInfo = cstate->partitions + leaf_part_index;
+
+                       /* We do not yet have a way to insert into a foreign partition */
+                       if (resultRelInfo->ri_FdwRoutine)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot route inserted tuples to a foreign table")));
+
+                       /*
+                        * For ExecInsertIndexTuples() to work on the partition's indexes
+                        */
+                       estate->es_result_relation_info = resultRelInfo;
+
+                       /*
+                        * We might need to convert from the parent rowtype to the
+                        * partition rowtype.
+                        */
+                       map = cstate->partition_tupconv_maps[leaf_part_index];
+                       if (map)
+                       {
+                               Relation        partrel = resultRelInfo->ri_RelationDesc;
+
+                               tuple = do_convert_tuple(tuple, map);
+
+                               /*
+                                * We must use the partition's tuple descriptor from this
+                                * point on.  Use a dedicated slot from this point on until
+                                * we're finished dealing with the partition.
+                                */
+                               slot = cstate->partition_tuple_slot;
+                               Assert(slot != NULL);
+                               ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+                               ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+                       }
+
+                       tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+               }
+
                skip_tuple = false;
 
                /* BEFORE ROW INSERT Triggers */
@@ -2457,52 +2638,66 @@ CopyFrom(CopyState cstate)
 
                if (!skip_tuple)
                {
-                       /* Check the constraints of the tuple */
-                       if (cstate->rel->rd_att->constr)
-                               ExecConstraints(resultRelInfo, slot, estate);
-
-                       if (useHeapMultiInsert)
+                       if (resultRelInfo->ri_TrigDesc &&
+                               resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
                        {
-                               /* Add this tuple to the tuple buffer */
-                               if (nBufferedTuples == 0)
-                                       firstBufferedLineNo = cstate->cur_lineno;
-                               bufferedTuples[nBufferedTuples++] = tuple;
-                               bufferedTuplesSize += tuple->t_len;
-
-                               /*
-                                * If the buffer filled up, flush it. Also flush if the total
-                                * size of all the tuples in the buffer becomes large, to
-                                * avoid using large amounts of memory for the buffers when
-                                * the tuples are exceptionally wide.
-                                */
-                               if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-                                       bufferedTuplesSize > 65535)
-                               {
-                                       CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-                                                                               resultRelInfo, myslot, bistate,
-                                                                               nBufferedTuples, bufferedTuples,
-                                                                               firstBufferedLineNo);
-                                       nBufferedTuples = 0;
-                                       bufferedTuplesSize = 0;
-                               }
+                               /* Pass the data to the INSTEAD ROW INSERT trigger */
+                               ExecIRInsertTriggers(estate, resultRelInfo, slot);
                        }
                        else
                        {
-                               List       *recheckIndexes = NIL;
+                               /* Check the constraints of the tuple */
+                               if (cstate->rel->rd_att->constr ||
+                                       resultRelInfo->ri_PartitionCheck)
+                                       ExecConstraints(resultRelInfo, slot, estate);
+
+                               if (useHeapMultiInsert)
+                               {
+                                       /* Add this tuple to the tuple buffer */
+                                       if (nBufferedTuples == 0)
+                                               firstBufferedLineNo = cstate->cur_lineno;
+                                       bufferedTuples[nBufferedTuples++] = tuple;
+                                       bufferedTuplesSize += tuple->t_len;
+
+                                       /*
+                                        * If the buffer filled up, flush it.  Also flush if the
+                                        * total size of all the tuples in the buffer becomes
+                                        * large, to avoid using large amounts of memory for the
+                                        * buffer when the tuples are exceptionally wide.
+                                        */
+                                       if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+                                               bufferedTuplesSize > 65535)
+                                       {
+                                               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                                                                       resultRelInfo, myslot, bistate,
+                                                                                       nBufferedTuples, bufferedTuples,
+                                                                                       firstBufferedLineNo);
+                                               nBufferedTuples = 0;
+                                               bufferedTuplesSize = 0;
+                                       }
+                               }
+                               else
+                               {
+                                       List       *recheckIndexes = NIL;
 
-                               /* OK, store the tuple and create index entries for it */
-                               heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+                                       /* OK, store the tuple and create index entries for it */
+                                       heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+                                                               hi_options, bistate);
 
-                               if (resultRelInfo->ri_NumIndices > 0)
-                                       recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-                                                                                                                estate, false, NULL,
-                                                                                                                  NIL);
+                                       if (resultRelInfo->ri_NumIndices > 0)
+                                               recheckIndexes = ExecInsertIndexTuples(slot,
+                                                                                                                       &(tuple->t_self),
+                                                                                                                          estate,
+                                                                                                                          false,
+                                                                                                                          NULL,
+                                                                                                                          NIL);
 
-                               /* AFTER ROW INSERT Triggers */
-                               ExecARInsertTriggers(estate, resultRelInfo, tuple,
-                                                                        recheckIndexes);
+                                       /* AFTER ROW INSERT Triggers */
+                                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
+                                                                                recheckIndexes);
 
-                               list_free(recheckIndexes);
+                                       list_free(recheckIndexes);
+                               }
                        }
 
                        /*
@@ -2511,6 +2706,12 @@ CopyFrom(CopyState cstate)
                         * tuples inserted by an INSERT command.
                         */
                        processed++;
+
+                       if (saved_resultRelInfo)
+                       {
+                               resultRelInfo = saved_resultRelInfo;
+                               estate->es_result_relation_info = resultRelInfo;
+                       }
                }
        }
 
@@ -2548,6 +2749,36 @@ CopyFrom(CopyState cstate)
 
        ExecCloseIndices(resultRelInfo);
 
+       /* Close all the partitioned tables, leaf partitions, and their indices */
+       if (cstate->partition_dispatch_info)
+       {
+               int                     i;
+
+               /*
+                * Remember cstate->partition_dispatch_info[0] corresponds to the root
+                * partitioned table, which we must not try to close, because it is
+                * the main target table of COPY that will be closed eventually by
+                * DoCopy().  Also, tupslot is NULL for the root partitioned table.
+                */
+               for (i = 1; i < cstate->num_dispatch; i++)
+               {
+                       PartitionDispatch pd = cstate->partition_dispatch_info[i];
+
+                       heap_close(pd->reldesc, NoLock);
+                       ExecDropSingleTupleTableSlot(pd->tupslot);
+               }
+               for (i = 0; i < cstate->num_partitions; i++)
+               {
+                       ResultRelInfo *resultRelInfo = cstate->partitions + i;
+
+                       ExecCloseIndices(resultRelInfo);
+                       heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+               }
+
+               /* Release the standalone partition tuple descriptor */
+               ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
+       }
+
        FreeExecutorState(estate);
 
        /*
@@ -2649,9 +2880,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
  */
 CopyState
-BeginCopyFrom(Relation rel,
+BeginCopyFrom(ParseState *pstate,
+                         Relation rel,
                          const char *filename,
                          bool is_program,
+                         copy_data_source_cb data_source_cb,
                          List *attnamelist,
                          List *options)
 {
@@ -2670,7 +2903,7 @@ BeginCopyFrom(Relation rel,
        MemoryContext oldcontext;
        bool            volatile_defexprs;
 
-       cstate = BeginCopy(true, rel, NULL, NULL, InvalidOid, attnamelist, options);
+       cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
        /* Initialize state variables */
@@ -2682,8 +2915,8 @@ BeginCopyFrom(Relation rel,
        cstate->cur_attval = NULL;
 
        /* Set up variables to avoid per-attribute overhead. */
-       initStringInfo(&cstate->attribute_buf);
-       initStringInfo(&cstate->line_buf);
+       initLongStringInfo(&cstate->attribute_buf);
+       initLongStringInfo(&cstate->line_buf);
        cstate->line_buf_converted = false;
        cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
        cstate->raw_buf_index = cstate->raw_buf_len = 0;
@@ -2747,7 +2980,7 @@ BeginCopyFrom(Relation rel,
                                 * the special case of when the default expression is the
                                 * nextval() of a sequence which in this specific case is
                                 * known to be safe for use with the multi-insert
-                                * optimisation. Hence we use this special case function
+                                * optimization. Hence we use this special case function
                                 * checker rather than the standard check for
                                 * contain_volatile_functions().
                                 */
@@ -2766,7 +2999,12 @@ BeginCopyFrom(Relation rel,
        cstate->num_defaults = num_defaults;
        cstate->is_program = is_program;
 
-       if (pipe)
+       if (data_source_cb)
+       {
+               cstate->copy_dest = COPY_CALLBACK;
+               cstate->data_source_cb = data_source_cb;
+       }
+       else if (pipe)
        {
                Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput == DestRemote)
@@ -2793,10 +3031,18 @@ BeginCopyFrom(Relation rel,
 
                        cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
                        if (cstate->copy_file == NULL)
+                       {
+                               /* copy errno because ereport subfunctions might change it */
+                               int                     save_errno = errno;
+
                                ereport(ERROR,
                                                (errcode_for_file_access(),
                                                 errmsg("could not open file \"%s\" for reading: %m",
-                                                               cstate->filename)));
+                                                               cstate->filename),
+                                                (save_errno == ENOENT || save_errno == EACCES) ?
+                                                errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
+                                                                "You may want a client-side facility such as psql's \\copy.") : 0));
+                       }
 
                        if (fstat(fileno(cstate->copy_file), &st))
                                ereport(ERROR,
@@ -3172,7 +3418,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
                values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
-                                                                                &nulls[defmap[i]], NULL);
+                                                                                &nulls[defmap[i]]);
        }
 
        return true;