]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/copy.c
Post-PG 10 beta1 pgindent run
[postgresql] / src / backend / commands / copy.c
index 57429035e895e5b74f158e005b726ea3d40915ce..84b1a54cb9b4ed81015ef96a30f7c01179750d99 100644 (file)
@@ -3,7 +3,7 @@
  * copy.c
  *             Implements the COPY utility command
  *
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
 #include <arpa/inet.h>
 
 #include "access/heapam.h"
+#include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/xact.h"
-#include "catalog/namespace.h"
+#include "access/xlog.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "commands/defrem.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "optimizer/planner.h"
+#include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
-#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/portal.h"
+#include "utils/rel.h"
+#include "utils/rls.h"
 #include "utils/snapmgr.h"
 
 
  */
 typedef enum CopyDest
 {
-       COPY_FILE,                                      /* to/from file */
+       COPY_FILE,                                      /* to/from file (or a piped program) */
        COPY_OLD_FE,                            /* to/from frontend (2.0 protocol) */
-       COPY_NEW_FE                                     /* to/from frontend (3.0 protocol) */
+       COPY_NEW_FE,                            /* to/from frontend (3.0 protocol) */
+       COPY_CALLBACK                           /* to/from callback function */
 } CopyDest;
 
 /*
@@ -104,8 +110,11 @@ typedef struct CopyStateData
        QueryDesc  *queryDesc;          /* executable query to copy from */
        List       *attnumlist;         /* integer list of attnums to copy */
        char       *filename;           /* filename, or NULL for STDIN/STDOUT */
+       bool            is_program;             /* is 'filename' a program to popen? */
+       copy_data_source_cb data_source_cb; /* function for reading data */
        bool            binary;                 /* binary format? */
        bool            oids;                   /* include OIDs? */
+       bool            freeze;                 /* freeze rows on loading? */
        bool            csv_mode;               /* Comma Separated Value format? */
        bool            header_line;    /* CSV header line? */
        char       *null_print;         /* NULL marker string (server encoding!) */
@@ -115,10 +124,15 @@ typedef struct CopyStateData
        char       *quote;                      /* CSV quote char (must be 1 byte) */
        char       *escape;                     /* CSV escape char (must be 1 byte) */
        List       *force_quote;        /* list of column names */
-       bool            force_quote_all;        /* FORCE QUOTE *? */
+       bool            force_quote_all;        /* FORCE_QUOTE *? */
        bool       *force_quote_flags;          /* per-column CSV FQ flags */
        List       *force_notnull;      /* list of column names */
        bool       *force_notnull_flags;        /* per-column CSV FNN flags */
+       List       *force_null;         /* list of column names */
+       bool       *force_null_flags;           /* per-column CSV FN flags */
+       bool            convert_selectively;    /* do selective binary conversion? */
+       List       *convert_select; /* list of column names (can be NIL) */
+       bool       *convert_select_flags;       /* per-column CSV/TEXT CS flags */
 
        /* these are just for error messages, see CopyFromErrorCallback */
        const char *cur_relname;        /* table name for error messages */
@@ -148,6 +162,15 @@ typedef struct CopyStateData
        Oid                *typioparams;        /* array of element types for in_functions */
        int                *defmap;                     /* array of default att numbers */
        ExprState **defexprs;           /* array of default att expressions */
+       bool            volatile_defexprs;              /* is any of defexprs volatile? */
+       List       *range_table;
+
+       PartitionDispatch *partition_dispatch_info;
+       int                     num_dispatch;   /* Number of entries in the above array */
+       int                     num_partitions; /* Number of members in the following arrays */
+       ResultRelInfo *partitions;      /* Per partition result relation */
+       TupleConversionMap **partition_tupconv_maps;
+       TupleTableSlot *partition_tuple_slot;
 
        /*
         * These variables are used to reduce overhead in textual COPY FROM.
@@ -173,10 +196,11 @@ typedef struct CopyStateData
         */
        StringInfoData line_buf;
        bool            line_buf_converted;             /* converted to server encoding? */
+       bool            line_buf_valid; /* contains the row being processed? */
 
        /*
         * Finally, raw_buf holds raw data read from the data source (file or
-        * client connection).  CopyReadLine parses this data sufficiently to
+        * client connection).  CopyReadLine parses this data sufficiently to
         * locate line boundaries, then transfers the data to line_buf and
         * converts it.  Note: we guarantee that there is a \0 at
         * raw_buf[raw_buf_len].
@@ -187,7 +211,7 @@ typedef struct CopyStateData
        int                     raw_buf_len;    /* total # of bytes stored */
 } CopyStateData;
 
-/* DestReceiver for COPY (SELECT) TO */
+/* DestReceiver for COPY (query) TO */
 typedef struct
 {
        DestReceiver pub;                       /* publicly-known function pointers */
@@ -202,7 +226,7 @@ typedef struct
  * function call overhead in tight COPY loops.
  *
  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
- * prevent the continue/break processing from working. We end the "if (1)"
+ * prevent the continue/break processing from working.  We end the "if (1)"
  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
  * any "else" in the calling code, and to avoid any compiler warnings about
  * empty statements.  See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
@@ -265,17 +289,25 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
-                 const char *queryString, List *attnamelist, List *options);
+static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
+                 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
+                 List *options);
 static void EndCopy(CopyState cstate);
-static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
-                       const char *filename, List *attnamelist, List *options);
+static void ClosePipeToProgram(CopyState cstate);
+static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+                       Oid queryRelId, const char *filename, bool is_program,
+                       List *attnamelist, List *options);
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
                         Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
+static void CopyFromInsertBatch(CopyState cstate, EState *estate,
+                                       CommandId mycid, int hi_options,
+                                       ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+                                       BulkInsertState bistate,
+                                       int nBufferedTuples, HeapTuple *bufferedTuples,
+                                       int firstBufferedLineNo);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int     CopyReadAttributesText(CopyState cstate);
@@ -295,7 +327,7 @@ static char *limit_printout_length(const char *str);
 static void SendCopyBegin(CopyState cstate);
 static void ReceiveCopyBegin(CopyState cstate);
 static void SendCopyEnd(CopyState cstate);
-static void CopySendData(CopyState cstate, void *databuf, int datasize);
+static void CopySendData(CopyState cstate, const void *databuf, int datasize);
 static void CopySendString(CopyState cstate, const char *str);
 static void CopySendChar(CopyState cstate, char c);
 static void CopySendEndOfRow(CopyState cstate);
@@ -330,7 +362,7 @@ SendCopyBegin(CopyState cstate)
                pq_endmessage(&buf);
                cstate->copy_dest = COPY_NEW_FE;
        }
-       else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+       else
        {
                /* old way */
                if (cstate->binary)
@@ -342,18 +374,6 @@ SendCopyBegin(CopyState cstate)
                pq_startcopyout();
                cstate->copy_dest = COPY_OLD_FE;
        }
-       else
-       {
-               /* very old way */
-               if (cstate->binary)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                       errmsg("COPY BINARY is not supported to stdout or from stdin")));
-               pq_putemptymessage('B');
-               /* grottiness needed for old COPY OUT protocol */
-               pq_startcopyout();
-               cstate->copy_dest = COPY_OLD_FE;
-       }
 }
 
 static void
@@ -376,7 +396,7 @@ ReceiveCopyBegin(CopyState cstate)
                cstate->copy_dest = COPY_NEW_FE;
                cstate->fe_msgbuf = makeStringInfo();
        }
-       else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+       else
        {
                /* old way */
                if (cstate->binary)
@@ -384,16 +404,8 @@ ReceiveCopyBegin(CopyState cstate)
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                        errmsg("COPY BINARY is not supported to stdout or from stdin")));
                pq_putemptymessage('G');
-               cstate->copy_dest = COPY_OLD_FE;
-       }
-       else
-       {
-               /* very old way */
-               if (cstate->binary)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                       errmsg("COPY BINARY is not supported to stdout or from stdin")));
-               pq_putemptymessage('D');
+               /* any error in old protocol will make us lose sync */
+               pq_startmsgread();
                cstate->copy_dest = COPY_OLD_FE;
        }
        /* We *must* flush here to ensure FE knows it can send. */
@@ -430,9 +442,9 @@ SendCopyEnd(CopyState cstate)
  *----------
  */
 static void
-CopySendData(CopyState cstate, void *databuf, int datasize)
+CopySendData(CopyState cstate, const void *databuf, int datasize)
 {
-       appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
+       appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
 }
 
 static void
@@ -465,12 +477,38 @@ CopySendEndOfRow(CopyState cstate)
 #endif
                        }
 
-                       (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
-                                                 1, cstate->copy_file);
-                       if (ferror(cstate->copy_file))
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not write to COPY file: %m")));
+                       if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
+                                          cstate->copy_file) != 1 ||
+                               ferror(cstate->copy_file))
+                       {
+                               if (cstate->is_program)
+                               {
+                                       if (errno == EPIPE)
+                                       {
+                                               /*
+                                                * The pipe will be closed automatically on error at
+                                                * the end of transaction, but we might get a better
+                                                * error message from the subprocess' exit code than
+                                                * just "Broken Pipe"
+                                                */
+                                               ClosePipeToProgram(cstate);
+
+                                               /*
+                                                * If ClosePipeToProgram() didn't throw an error, the
+                                                * program terminated normally, but closed the pipe
+                                                * first. Restore errno, and throw an error.
+                                                */
+                                               errno = EPIPE;
+                                       }
+                                       ereport(ERROR,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not write to COPY program: %m")));
+                               }
+                               else
+                                       ereport(ERROR,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not write to COPY file: %m")));
+                       }
                        break;
                case COPY_OLD_FE:
                        /* The FE/BE protocol uses \n as newline for all platforms */
@@ -493,6 +531,9 @@ CopySendEndOfRow(CopyState cstate)
                        /* Dump the accumulated row as one CopyData message */
                        (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
                        break;
+               case COPY_CALLBACK:
+                       Assert(false);          /* Not yet supported. */
+                       break;
        }
 
        resetStringInfo(fe_msgbuf);
@@ -502,7 +543,7 @@ CopySendEndOfRow(CopyState cstate)
  * CopyGetData reads data from the source (file or frontend)
  *
  * We attempt to read at least minread, and at most maxread, bytes from
- * the source. The actual number of bytes read is returned; if this is
+ * the source.  The actual number of bytes read is returned; if this is
  * less than minread, EOF was detected.
  *
  * Note: when copying from the frontend, we expect a proper EOF mark per
@@ -539,7 +580,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                /* Only a \. terminator is legal EOF in old protocol */
                                ereport(ERROR,
                                                (errcode(ERRCODE_CONNECTION_FAILURE),
-                                                errmsg("unexpected EOF on client connection")));
+                                                errmsg("unexpected EOF on client connection with an open transaction")));
                        }
                        bytesread = minread;
                        break;
@@ -554,15 +595,18 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                        int                     mtype;
 
                        readmessage:
+                                       HOLD_CANCEL_INTERRUPTS();
+                                       pq_startmsgread();
                                        mtype = pq_getbyte();
                                        if (mtype == EOF)
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_CONNECTION_FAILURE),
-                                                        errmsg("unexpected EOF on client connection")));
+                                                                errmsg("unexpected EOF on client connection with an open transaction")));
                                        if (pq_getmessage(cstate->fe_msgbuf, 0))
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_CONNECTION_FAILURE),
-                                                        errmsg("unexpected EOF on client connection")));
+                                                                errmsg("unexpected EOF on client connection with an open transaction")));
+                                       RESUME_CANCEL_INTERRUPTS();
                                        switch (mtype)
                                        {
                                                case 'd':               /* CopyData */
@@ -604,6 +648,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                bytesread += avail;
                        }
                        break;
+               case COPY_CALLBACK:
+                       bytesread = cstate->data_source_cb(databuf, minread, maxread);
+                       break;
        }
 
        return bytesread;
@@ -716,10 +763,11 @@ CopyLoadRawBuf(CopyState cstate)
  *
  * Either unload or reload contents of table <relation>, depending on <from>.
  * (<from> = TRUE means we are inserting into the table.)  In the "TO" case
- * we also support copying the output of an arbitrary SELECT query.
+ * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
+ * or DELETE query.
  *
  * If <pipe> is false, transfer is between the table and the file named
- * <filename>. Otherwise, transfer is between the table and our regular
+ * <filename>.  Otherwise, transfer is between the table and our regular
  * input/output stream. The latter could be either stdin/stdout or a
  * socket, depending on whether we're running under Postmaster control.
  *
@@ -729,30 +777,41 @@ CopyLoadRawBuf(CopyState cstate)
  * Do not allow the copy if user doesn't have proper permission to access
  * the table or the specifically requested columns.
  */
-uint64
-DoCopy(const CopyStmt *stmt, const char *queryString)
+void
+DoCopy(ParseState *pstate, const CopyStmt *stmt,
+          int stmt_location, int stmt_len,
+          uint64 *processed)
 {
        CopyState       cstate;
        bool            is_from = stmt->is_from;
        bool            pipe = (stmt->filename == NULL);
        Relation        rel;
-       uint64          processed;
+       Oid                     relid;
+       RawStmt    *query = NULL;
 
-       /* Disallow file COPY except to superusers. */
+       /* Disallow COPY to/from file or program except to superusers. */
        if (!pipe && !superuser())
-               ereport(ERROR,
-                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                errmsg("must be superuser to COPY to or from a file"),
-                                errhint("Anyone can COPY to stdout or from stdin. "
-                                                "psql's \\copy command also works for anyone.")));
+       {
+               if (stmt->is_program)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("must be superuser to COPY to or from an external program"),
+                                        errhint("Anyone can COPY to stdout or from stdin. "
+                                                  "psql's \\copy command also works for anyone.")));
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("must be superuser to COPY to or from a file"),
+                                        errhint("Anyone can COPY to stdout or from stdin. "
+                                                  "psql's \\copy command also works for anyone.")));
+       }
 
        if (stmt->relation)
        {
                TupleDesc       tupDesc;
-               AclMode         required_access = (is_from ? ACL_INSERT : ACL_SELECT);
-               RangeTblEntry *rte;
                List       *attnums;
                ListCell   *cur;
+               RangeTblEntry *rte;
 
                Assert(!stmt->query);
 
@@ -760,11 +819,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                rel = heap_openrv(stmt->relation,
                                                  (is_from ? RowExclusiveLock : AccessShareLock));
 
-               rte = makeNode(RangeTblEntry);
-               rte->rtekind = RTE_RELATION;
-               rte->relid = RelationGetRelid(rel);
-               rte->relkind = rel->rd_rel->relkind;
-               rte->requiredPerms = required_access;
+               relid = RelationGetRelid(rel);
+
+               rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
+               rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
 
                tupDesc = RelationGetDescr(rel);
                attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
@@ -774,35 +832,155 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                        FirstLowInvalidHeapAttributeNumber;
 
                        if (is_from)
-                               rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
+                               rte->insertedCols = bms_add_member(rte->insertedCols, attno);
                        else
                                rte->selectedCols = bms_add_member(rte->selectedCols, attno);
                }
-               ExecCheckRTPerms(list_make1(rte), true);
+               ExecCheckRTPerms(pstate->p_rtable, true);
+
+               /*
+                * Permission check for row security policies.
+                *
+                * check_enable_rls will ereport(ERROR) if the user has requested
+                * something invalid and will otherwise indicate if we should enable
+                * RLS (returns RLS_ENABLED) or not for this COPY statement.
+                *
+                * If the relation has a row security policy and we are to apply it
+                * then perform a "query" copy and allow the normal query processing
+                * to handle the policies.
+                *
+                * If RLS is not enabled for this, then just fall through to the
+                * normal non-filtering relation handling.
+                */
+               if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
+               {
+                       SelectStmt *select;
+                       ColumnRef  *cr;
+                       ResTarget  *target;
+                       RangeVar   *from;
+                       List       *targetList = NIL;
+
+                       if (is_from)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                  errmsg("COPY FROM not supported with row-level security"),
+                                                errhint("Use INSERT statements instead.")));
+
+                       /*
+                        * Build target list
+                        *
+                        * If no columns are specified in the attribute list of the COPY
+                        * command, then the target list is 'all' columns. Therefore, '*'
+                        * should be used as the target list for the resulting SELECT
+                        * statement.
+                        *
+                        * In the case that columns are specified in the attribute list,
+                        * create a ColumnRef and ResTarget for each column and add them
+                        * to the target list for the resulting SELECT statement.
+                        */
+                       if (!stmt->attlist)
+                       {
+                               cr = makeNode(ColumnRef);
+                               cr->fields = list_make1(makeNode(A_Star));
+                               cr->location = -1;
+
+                               target = makeNode(ResTarget);
+                               target->name = NULL;
+                               target->indirection = NIL;
+                               target->val = (Node *) cr;
+                               target->location = -1;
+
+                               targetList = list_make1(target);
+                       }
+                       else
+                       {
+                               ListCell   *lc;
+
+                               foreach(lc, stmt->attlist)
+                               {
+                                       /*
+                                        * Build the ColumnRef for each column.  The ColumnRef
+                                        * 'fields' property is a String 'Value' node (see
+                                        * nodes/value.h) that corresponds to the column name
+                                        * respectively.
+                                        */
+                                       cr = makeNode(ColumnRef);
+                                       cr->fields = list_make1(lfirst(lc));
+                                       cr->location = -1;
+
+                                       /* Build the ResTarget and add the ColumnRef to it. */
+                                       target = makeNode(ResTarget);
+                                       target->name = NULL;
+                                       target->indirection = NIL;
+                                       target->val = (Node *) cr;
+                                       target->location = -1;
+
+                                       /* Add each column to the SELECT statement's target list */
+                                       targetList = lappend(targetList, target);
+                               }
+                       }
+
+                       /*
+                        * Build RangeVar for from clause, fully qualified based on the
+                        * relation which we have opened and locked.
+                        */
+                       from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
+                                                               pstrdup(RelationGetRelationName(rel)),
+                                                               -1);
+
+                       /* Build query */
+                       select = makeNode(SelectStmt);
+                       select->targetList = targetList;
+                       select->fromClause = list_make1(from);
+
+                       query = makeNode(RawStmt);
+                       query->stmt = (Node *) select;
+                       query->stmt_location = stmt_location;
+                       query->stmt_len = stmt_len;
+
+                       /*
+                        * Close the relation for now, but keep the lock on it to prevent
+                        * changes between now and when we start the query-based COPY.
+                        *
+                        * We'll reopen it later as part of the query-based COPY.
+                        */
+                       heap_close(rel, NoLock);
+                       rel = NULL;
+               }
        }
        else
        {
                Assert(stmt->query);
 
+               query = makeNode(RawStmt);
+               query->stmt = stmt->query;
+               query->stmt_location = stmt_location;
+               query->stmt_len = stmt_len;
+
+               relid = InvalidOid;
                rel = NULL;
        }
 
        if (is_from)
        {
-               /* check read-only transaction */
-               if (XactReadOnly && rel->rd_backend != MyBackendId)
+               Assert(rel);
+
+               /* check read-only transaction and parallel mode */
+               if (XactReadOnly && !rel->rd_islocaltemp)
                        PreventCommandIfReadOnly("COPY FROM");
+               PreventCommandIfParallelMode("COPY FROM");
 
-               cstate = BeginCopyFrom(rel, stmt->filename,
-                                                          stmt->attlist, stmt->options);
-               processed = CopyFrom(cstate);   /* copy from file to database */
+               cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
+                                                          NULL, stmt->attlist, stmt->options);
+               *processed = CopyFrom(cstate);  /* copy from file to database */
                EndCopyFrom(cstate);
        }
        else
        {
-               cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+               cstate = BeginCopyTo(pstate, rel, query, relid,
+                                                        stmt->filename, stmt->is_program,
                                                         stmt->attlist, stmt->options);
-               processed = DoCopyTo(cstate);   /* copy from database to file */
+               *processed = DoCopyTo(cstate);  /* copy from database to file */
                EndCopyTo(cstate);
        }
 
@@ -813,8 +991,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
         */
        if (rel != NULL)
                heap_close(rel, (is_from ? NoLock : AccessShareLock));
-
-       return processed;
 }
 
 /*
@@ -835,7 +1011,8 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
  * self-consistency of the options list.
  */
 void
-ProcessCopyOptions(CopyState cstate,
+ProcessCopyOptions(ParseState *pstate,
+                                  CopyState cstate,
                                   bool is_from,
                                   List *options)
 {
@@ -851,7 +1028,7 @@ ProcessCopyOptions(CopyState cstate,
        /* Extract options from the statement node tree */
        foreach(option, options)
        {
-               DefElem    *defel = (DefElem *) lfirst(option);
+               DefElem    *defel = lfirst_node(DefElem, option);
 
                if (strcmp(defel->defname, "format") == 0)
                {
@@ -860,7 +1037,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (format_specified)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        format_specified = true;
                        if (strcmp(fmt, "text") == 0)
                                 /* default format */ ;
@@ -871,22 +1049,34 @@ ProcessCopyOptions(CopyState cstate,
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                                errmsg("COPY format \"%s\" not recognized", fmt)));
+                                                errmsg("COPY format \"%s\" not recognized", fmt),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "oids") == 0)
                {
                        if (cstate->oids)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->oids = defGetBoolean(defel);
                }
+               else if (strcmp(defel->defname, "freeze") == 0)
+               {
+                       if (cstate->freeze)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
+                       cstate->freeze = defGetBoolean(defel);
+               }
                else if (strcmp(defel->defname, "delimiter") == 0)
                {
                        if (cstate->delim)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->delim = defGetString(defel);
                }
                else if (strcmp(defel->defname, "null") == 0)
@@ -894,7 +1084,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->null_print)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->null_print = defGetString(defel);
                }
                else if (strcmp(defel->defname, "header") == 0)
@@ -902,7 +1093,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->header_line)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->header_line = defGetBoolean(defel);
                }
                else if (strcmp(defel->defname, "quote") == 0)
@@ -910,7 +1102,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->quote)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->quote = defGetString(defel);
                }
                else if (strcmp(defel->defname, "escape") == 0)
@@ -918,7 +1111,8 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->escape)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->escape = defGetString(defel);
                }
                else if (strcmp(defel->defname, "force_quote") == 0)
@@ -926,49 +1120,93 @@ ProcessCopyOptions(CopyState cstate,
                        if (cstate->force_quote || cstate->force_quote_all)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        if (defel->arg && IsA(defel->arg, A_Star))
                                cstate->force_quote_all = true;
                        else if (defel->arg && IsA(defel->arg, List))
-                               cstate->force_quote = (List *) defel->arg;
+                               cstate->force_quote = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a list of column names",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "force_not_null") == 0)
                {
                        if (cstate->force_notnull)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
+                       if (defel->arg && IsA(defel->arg, List))
+                               cstate->force_notnull = castNode(List, defel->arg);
+                       else
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("argument to option \"%s\" must be a list of column names",
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
+               }
+               else if (strcmp(defel->defname, "force_null") == 0)
+               {
+                       if (cstate->force_null)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
                        if (defel->arg && IsA(defel->arg, List))
-                               cstate->force_notnull = (List *) defel->arg;
+                               cstate->force_null = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a list of column names",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
+               }
+               else if (strcmp(defel->defname, "convert_selectively") == 0)
+               {
+                       /*
+                        * Undocumented, not-accessible-from-SQL option: convert only the
+                        * named columns to binary form, storing the rest as NULLs. It's
+                        * allowed for the column list to be NIL.
+                        */
+                       if (cstate->convert_selectively)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
+                       cstate->convert_selectively = true;
+                       if (defel->arg == NULL || IsA(defel->arg, List))
+                               cstate->convert_select = castNode(List, defel->arg);
+                       else
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("argument to option \"%s\" must be a list of column names",
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else if (strcmp(defel->defname, "encoding") == 0)
                {
                        if (cstate->file_encoding >= 0)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                                errmsg("conflicting or redundant options")));
+                                                errmsg("conflicting or redundant options"),
+                                                parser_errposition(pstate, defel->location)));
                        cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
                        if (cstate->file_encoding < 0)
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                                 errmsg("argument to option \"%s\" must be a valid encoding name",
-                                                               defel->defname)));
+                                                               defel->defname),
+                                                parser_errposition(pstate, defel->location)));
                }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
                                         errmsg("option \"%s\" not recognized",
-                                                       defel->defname)));
+                                                       defel->defname),
+                                        parser_errposition(pstate, defel->location)));
        }
 
        /*
@@ -1090,6 +1328,17 @@ ProcessCopyOptions(CopyState cstate,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                          errmsg("COPY force not null only available using COPY FROM")));
 
+       /* Check force_null */
+       if (!cstate->csv_mode && cstate->force_null != NIL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("COPY force null available only in CSV mode")));
+
+       if (cstate->force_null != NIL && !is_from)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("COPY force null only available using COPY FROM")));
+
        /* Don't allow the delimiter to appear in the null string. */
        if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
                ereport(ERROR,
@@ -1120,10 +1369,11 @@ ProcessCopyOptions(CopyState cstate,
  * NULL values as <null_print>.
  */
 static CopyState
-BeginCopy(bool is_from,
+BeginCopy(ParseState *pstate,
+                 bool is_from,
                  Relation rel,
-                 Node *raw_query,
-                 const char *queryString,
+                 RawStmt *raw_query,
+                 Oid queryRelId,
                  List *attnamelist,
                  List *options)
 {
@@ -1141,14 +1391,12 @@ BeginCopy(bool is_from,
         */
        cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
                                                                                                "COPY",
-                                                                                               ALLOCSET_DEFAULT_MINSIZE,
-                                                                                               ALLOCSET_DEFAULT_INITSIZE,
-                                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                               ALLOCSET_DEFAULT_SIZES);
 
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
        /* Extract options from the statement node tree */
-       ProcessCopyOptions(cstate, is_from, options);
+       ProcessCopyOptions(pstate, cstate, is_from, options);
 
        /* Process the source/target relation or query */
        if (rel)
@@ -1165,6 +1413,30 @@ BeginCopy(bool is_from,
                                        (errcode(ERRCODE_UNDEFINED_COLUMN),
                                         errmsg("table \"%s\" does not have OIDs",
                                                        RelationGetRelationName(cstate->rel))));
+
+               /* Initialize state for CopyFrom tuple routing. */
+               if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               {
+                       PartitionDispatch *partition_dispatch_info;
+                       ResultRelInfo *partitions;
+                       TupleConversionMap **partition_tupconv_maps;
+                       TupleTableSlot *partition_tuple_slot;
+                       int                     num_parted,
+                                               num_partitions;
+
+                       ExecSetupPartitionTupleRouting(rel,
+                                                                                  &partition_dispatch_info,
+                                                                                  &partitions,
+                                                                                  &partition_tupconv_maps,
+                                                                                  &partition_tuple_slot,
+                                                                                  &num_parted, &num_partitions);
+                       cstate->partition_dispatch_info = partition_dispatch_info;
+                       cstate->num_dispatch = num_parted;
+                       cstate->partitions = partitions;
+                       cstate->num_partitions = num_partitions;
+                       cstate->partition_tupconv_maps = partition_tupconv_maps;
+                       cstate->partition_tuple_slot = partition_tuple_slot;
+               }
        }
        else
        {
@@ -1176,14 +1448,14 @@ BeginCopy(bool is_from,
                Assert(!is_from);
                cstate->rel = NULL;
 
-               /* Don't allow COPY w/ OIDs from a select */
+               /* Don't allow COPY w/ OIDs from a query */
                if (cstate->oids)
                        ereport(ERROR,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                        errmsg("COPY (SELECT) WITH OIDS is not supported")));
+                                        errmsg("COPY (query) WITH OIDS is not supported")));
 
                /*
-                * Run parse analysis and rewrite.      Note this also acquires sufficient
+                * Run parse analysis and rewrite.  Note this also acquires sufficient
                 * locks on the source table(s).
                 *
                 * Because the parser and planner tend to scribble on their input, we
@@ -1192,25 +1464,95 @@ BeginCopy(bool is_from,
                 * function and is executed repeatedly.  (See also the same hack in
                 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
                 */
-               rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
-                                                                                  queryString, NULL, 0);
+               rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+                                                                                  pstate->p_sourcetext, NULL, 0,
+                                                                                  NULL);
 
-               /* We don't expect more or less than one result query */
-               if (list_length(rewritten) != 1)
-                       elog(ERROR, "unexpected rewrite result");
+               /* check that we got back something we can work with */
+               if (rewritten == NIL)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                        errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
+               }
+               else if (list_length(rewritten) > 1)
+               {
+                       ListCell   *lc;
 
-               query = (Query *) linitial(rewritten);
-               Assert(query->commandType == CMD_SELECT);
-               Assert(query->utilityStmt == NULL);
+                       /* examine queries to determine which error message to issue */
+                       foreach(lc, rewritten)
+                       {
+                               Query      *q = lfirst_node(Query, lc);
+
+                               if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("conditional DO INSTEAD rules are not supported for COPY")));
+                               if (q->querySource == QSRC_NON_INSTEAD_RULE)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                       errmsg("DO ALSO rules are not supported for the COPY")));
+                       }
 
-               /* Query mustn't use INTO, either */
-               if (query->intoClause)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
+               }
+
+               query = linitial_node(Query, rewritten);
+
+               /* The grammar allows SELECT INTO, but we don't support that */
+               if (query->utilityStmt != NULL &&
+                       IsA(query->utilityStmt, CreateTableAsStmt))
                        ereport(ERROR,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                         errmsg("COPY (SELECT INTO) is not supported")));
 
+               Assert(query->utilityStmt == NULL);
+
+               /*
+                * Similarly the grammar doesn't enforce the presence of a RETURNING
+                * clause, but this is required here.
+                */
+               if (query->commandType != CMD_SELECT &&
+                       query->returningList == NIL)
+               {
+                       Assert(query->commandType == CMD_INSERT ||
+                                  query->commandType == CMD_UPDATE ||
+                                  query->commandType == CMD_DELETE);
+
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("COPY query must have a RETURNING clause")));
+               }
+
                /* plan the query */
-               plan = planner(query, 0, NULL);
+               plan = pg_plan_query(query, 0, NULL);
+
+               /*
+                * With row level security and a user using "COPY relation TO", we
+                * have to convert the "COPY relation TO" to a query-based COPY (eg:
+                * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
+                * in any RLS clauses.
+                *
+                * When this happens, we are passed in the relid of the originally
+                * found relation (which we have locked).  As the planner will look up
+                * the relation again, we double-check here to make sure it found the
+                * same one that we have locked.
+                */
+               if (queryRelId != InvalidOid)
+               {
+                       /*
+                        * Note that with RLS involved there may be multiple relations,
+                        * and while the one we need is almost certainly first, we don't
+                        * make any guarantees of that in the planner, so check the whole
+                        * list and make sure we find the original relation.
+                        */
+                       if (!list_member_oid(plan->relationOids, queryRelId))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("relation referenced by COPY statement has changed")));
+               }
 
                /*
                 * Use a snapshot with an updated command ID to ensure this query sees
@@ -1224,10 +1566,10 @@ BeginCopy(bool is_from,
                ((DR_copy *) dest)->cstate = cstate;
 
                /* Create a QueryDesc requesting no output */
-               cstate->queryDesc = CreateQueryDesc(plan, queryString,
+               cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
                                                                                        GetActiveSnapshot(),
                                                                                        InvalidSnapshot,
-                                                                                       dest, NULL, 0);
+                                                                                       dest, NULL, NULL, 0);
 
                /*
                 * Call ExecutorStart to prepare the plan for execution.
@@ -1244,7 +1586,7 @@ BeginCopy(bool is_from,
 
        num_phys_attrs = tupDesc->natts;
 
-       /* Convert FORCE QUOTE name list to per-column flags, check validity */
+       /* Convert FORCE_QUOTE name list to per-column flags, check validity */
        cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
        if (cstate->force_quote_all)
        {
@@ -1267,13 +1609,13 @@ BeginCopy(bool is_from,
                        if (!list_member_int(cstate->attnumlist, attnum))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-                                  errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
+                                  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
                                                  NameStr(tupDesc->attrs[attnum - 1]->attname))));
                        cstate->force_quote_flags[attnum - 1] = true;
                }
        }
 
-       /* Convert FORCE NOT NULL name list to per-column flags, check validity */
+       /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
        cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
        if (cstate->force_notnull)
        {
@@ -1289,12 +1631,57 @@ BeginCopy(bool is_from,
                        if (!list_member_int(cstate->attnumlist, attnum))
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-                               errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
+                               errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
                                           NameStr(tupDesc->attrs[attnum - 1]->attname))));
                        cstate->force_notnull_flags[attnum - 1] = true;
                }
        }
 
+       /* Convert FORCE_NULL name list to per-column flags, check validity */
+       cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
+       if (cstate->force_null)
+       {
+               List       *attnums;
+               ListCell   *cur;
+
+               attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
+
+               foreach(cur, attnums)
+               {
+                       int                     attnum = lfirst_int(cur);
+
+                       if (!list_member_int(cstate->attnumlist, attnum))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                                       errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
+                                                  NameStr(tupDesc->attrs[attnum - 1]->attname))));
+                       cstate->force_null_flags[attnum - 1] = true;
+               }
+       }
+
+       /* Convert convert_selectively name list to per-column flags */
+       if (cstate->convert_selectively)
+       {
+               List       *attnums;
+               ListCell   *cur;
+
+               cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
+
+               attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
+
+               foreach(cur, attnums)
+               {
+                       int                     attnum = lfirst_int(cur);
+
+                       if (!list_member_int(cstate->attnumlist, attnum))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                                                errmsg_internal("selected column \"%s\" not referenced by COPY",
+                                                        NameStr(tupDesc->attrs[attnum - 1]->attname))));
+                       cstate->convert_select_flags[attnum - 1] = true;
+               }
+       }
+
        /* Use client encoding when ENCODING option is not specified. */
        if (cstate->file_encoding < 0)
                cstate->file_encoding = pg_get_client_encoding();
@@ -1318,16 +1705,46 @@ BeginCopy(bool is_from,
 }
 
 /*
- * Release resources allocated in a cstate for COPY TO/FROM.
+ * Closes the pipe to an external program, checking the pclose() return code.
  */
 static void
-EndCopy(CopyState cstate)
+ClosePipeToProgram(CopyState cstate)
 {
-       if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+       int                     pclose_rc;
+
+       Assert(cstate->is_program);
+
+       pclose_rc = ClosePipeStream(cstate->copy_file);
+       if (pclose_rc == -1)
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not close file \"%s\": %m",
-                                               cstate->filename)));
+                                errmsg("could not close pipe to external command: %m")));
+       else if (pclose_rc != 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+                                errmsg("program \"%s\" failed",
+                                               cstate->filename),
+                                errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+}
+
+/*
+ * Release resources allocated in a cstate for COPY TO/FROM.
+ */
+static void
+EndCopy(CopyState cstate)
+{
+       if (cstate->is_program)
+       {
+               ClosePipeToProgram(cstate);
+       }
+       else
+       {
+               if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close file \"%s\": %m",
+                                                       cstate->filename)));
+       }
 
        MemoryContextDelete(cstate->copycontext);
        pfree(cstate);
@@ -1337,10 +1754,12 @@ EndCopy(CopyState cstate)
  * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
 static CopyState
-BeginCopyTo(Relation rel,
-                       Node *query,
-                       const char *queryString,
+BeginCopyTo(ParseState *pstate,
+                       Relation rel,
+                       RawStmt *query,
+                       Oid queryRelId,
                        const char *filename,
+                       bool is_program,
                        List *attnamelist,
                        List *options)
 {
@@ -1356,6 +1775,12 @@ BeginCopyTo(Relation rel,
                                         errmsg("cannot copy from view \"%s\"",
                                                        RelationGetRelationName(rel)),
                                         errhint("Try the COPY (SELECT ...) TO variant.")));
+               else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from materialized view \"%s\"",
+                                                       RelationGetRelationName(rel)),
+                                        errhint("Try the COPY (SELECT ...) TO variant.")));
                else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -1367,6 +1792,12 @@ BeginCopyTo(Relation rel,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("cannot copy from sequence \"%s\"",
                                                        RelationGetRelationName(rel))));
+               else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from partitioned table \"%s\"",
+                                                       RelationGetRelationName(rel)),
+                                        errhint("Try the COPY (SELECT ...) TO variant.")));
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -1374,44 +1805,72 @@ BeginCopyTo(Relation rel,
                                                        RelationGetRelationName(rel))));
        }
 
-       cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
+       cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
+                                          options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
        if (pipe)
        {
+               Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput != DestRemote)
                        cstate->copy_file = stdout;
        }
        else
        {
-               mode_t          oumask;         /* Pre-existing umask value */
-               struct stat st;
+               cstate->filename = pstrdup(filename);
+               cstate->is_program = is_program;
 
-               /*
-                * Prevent write to relative path ... too easy to shoot oneself in the
-                * foot by overwriting a database file ...
-                */
-               if (!is_absolute_path(filename))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_NAME),
-                                        errmsg("relative path not allowed for COPY to file")));
+               if (is_program)
+               {
+                       cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
+                       if (cstate->copy_file == NULL)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not execute command \"%s\": %m",
+                                                               cstate->filename)));
+               }
+               else
+               {
+                       mode_t          oumask; /* Pre-existing umask value */
+                       struct stat st;
 
-               cstate->filename = pstrdup(filename);
-               oumask = umask(S_IWGRP | S_IWOTH);
-               cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
-               umask(oumask);
+                       /*
+                        * Prevent write to relative path ... too easy to shoot oneself in
+                        * the foot by overwriting a database file ...
+                        */
+                       if (!is_absolute_path(filename))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_NAME),
+                                         errmsg("relative path not allowed for COPY to file")));
 
-               if (cstate->copy_file == NULL)
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not open file \"%s\" for writing: %m",
-                                                       cstate->filename)));
+                       oumask = umask(S_IWGRP | S_IWOTH);
+                       cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+                       umask(oumask);
+                       if (cstate->copy_file == NULL)
+                       {
+                               /* copy errno because ereport subfunctions might change it */
+                               int                     save_errno = errno;
 
-               fstat(fileno(cstate->copy_file), &st);
-               if (S_ISDIR(st.st_mode))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                        errmsg("\"%s\" is a directory", cstate->filename)));
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not open file \"%s\" for writing: %m",
+                                                               cstate->filename),
+                                                (save_errno == ENOENT || save_errno == EACCES) ?
+                                                errhint("COPY TO instructs the PostgreSQL server process to write a file. "
+                                                                "You may want a client-side facility such as psql's \\copy.") : 0));
+                       }
+
+                       if (fstat(fileno(cstate->copy_file), &st))
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not stat file \"%s\": %m",
+                                                               cstate->filename)));
+
+                       if (S_ISDIR(st.st_mode))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                                errmsg("\"%s\" is a directory", cstate->filename)));
+               }
        }
 
        MemoryContextSwitchTo(oldcontext);
@@ -1520,13 +1979,11 @@ CopyTo(CopyState cstate)
         * Create a temporary memory context that we can reset once per row to
         * recover palloc'd memory.  This avoids any problems with leaks inside
         * datatype output routines, and should be faster than retail pfree's
-        * anyway.      (We don't need a whole econtext as CopyFrom does.)
+        * anyway.  (We don't need a whole econtext as CopyFrom does.)
         */
        cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
                                                                                           "COPY TO",
-                                                                                          ALLOCSET_DEFAULT_MINSIZE,
-                                                                                          ALLOCSET_DEFAULT_INITSIZE,
-                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                          ALLOCSET_DEFAULT_SIZES);
 
        if (cstate->binary)
        {
@@ -1534,7 +1991,7 @@ CopyTo(CopyState cstate)
                int32           tmp;
 
                /* Signature */
-               CopySendData(cstate, (char *) BinarySignature, 11);
+               CopySendData(cstate, BinarySignature, 11);
                /* Flags field */
                tmp = 0;
                if (cstate->oids)
@@ -1612,7 +2069,7 @@ CopyTo(CopyState cstate)
        else
        {
                /* run the plan --- the dest receiver will send tuples */
-               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
                processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
        }
 
@@ -1764,8 +2221,18 @@ CopyFromErrorCallback(void *arg)
                }
                else
                {
-                       /* error is relevant to a particular line */
-                       if (cstate->line_buf_converted || !cstate->need_transcoding)
+                       /*
+                        * Error is relevant to a particular line.
+                        *
+                        * If line_buf still contains the correct line, and it's already
+                        * transcoded, print it. If it's still in a foreign encoding, it's
+                        * quite likely that the error is precisely a failure to do
+                        * encoding conversion (ie, bad data). We dare not try to convert
+                        * it, and at present there's no way to regurgitate it without
+                        * conversion. So we have to punt and just report the line number.
+                        */
+                       if (cstate->line_buf_valid &&
+                               (cstate->line_buf_converted || !cstate->need_transcoding))
                        {
                                char       *lineval;
 
@@ -1776,14 +2243,6 @@ CopyFromErrorCallback(void *arg)
                        }
                        else
                        {
-                               /*
-                                * Here, the line buffer is still in a foreign encoding, and
-                                * indeed it's quite likely that the error is precisely a
-                                * failure to do encoding conversion (ie, bad data).  We dare
-                                * not try to convert it, and at present there's no way to
-                                * regurgitate it without conversion.  So we have to punt and
-                                * just report the line number.
-                                */
                                errcontext("COPY %s, line %d",
                                                   cstate->cur_relname, cstate->cur_lineno);
                        }
@@ -1829,7 +2288,7 @@ limit_printout_length(const char *str)
 /*
  * Copy FROM file to relation.
  */
-static uint64
+uint64
 CopyFrom(CopyState cstate)
 {
        HeapTuple       tuple;
@@ -1837,24 +2296,48 @@ CopyFrom(CopyState cstate)
        Datum      *values;
        bool       *nulls;
        ResultRelInfo *resultRelInfo;
+       ResultRelInfo *saved_resultRelInfo = NULL;
        EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
        ExprContext *econtext;
        TupleTableSlot *myslot;
        MemoryContext oldcontext = CurrentMemoryContext;
-       ErrorContextCallback errcontext;
+
+       ErrorContextCallback errcallback;
        CommandId       mycid = GetCurrentCommandId(true);
        int                     hi_options = 0; /* start with default heap_insert options */
        BulkInsertState bistate;
        uint64          processed = 0;
+       bool            useHeapMultiInsert;
+       int                     nBufferedTuples = 0;
+       int                     prev_leaf_part_index = -1;
+
+#define MAX_BUFFERED_TUPLES 1000
+       HeapTuple  *bufferedTuples = NULL;      /* initialize to silence warning */
+       Size            bufferedTuplesSize = 0;
+       int                     firstBufferedLineNo = 0;
 
        Assert(cstate->rel);
 
-       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
+       /*
+        * The target must be a plain relation or have an INSTEAD OF INSERT row
+        * trigger.  (Currently, such triggers are only allowed on views, so we
+        * only hint about them in the view case.)
+        */
+       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+               cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+               !(cstate->rel->trigdesc &&
+                 cstate->rel->trigdesc->trig_insert_instead_row))
        {
                if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("cannot copy to view \"%s\"",
+                                                       RelationGetRelationName(cstate->rel)),
+                                        errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
+               else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy to materialized view \"%s\"",
                                                        RelationGetRelationName(cstate->rel))));
                else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
                        ereport(ERROR,
@@ -1887,9 +2370,17 @@ CopyFrom(CopyState cstate)
         * routine first.
         *
         * As mentioned in comments in utils/rel.h, the in-same-transaction test
-        * is not completely reliable, since in rare cases rd_createSubid or
-        * rd_newRelfilenodeSubid can be cleared before the end of the transaction.
-        * However this is OK since at worst we will fail to make the optimization.
+        * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
+        * can be cleared before the end of the transaction. The exact case is
+        * when a relation sets a new relfilenode twice in same transaction, yet
+        * the second one fails in an aborted subtransaction, e.g.
+        *
+        * BEGIN;
+        * TRUNCATE t;
+        * SAVEPOINT save;
+        * TRUNCATE t;
+        * ROLLBACK TO save;
+        * COPY ...
         *
         * Also, if the target file is new-in-transaction, we assume that checking
         * FSM for free space is a waste of time, even if we must use WAL because
@@ -1902,6 +2393,7 @@ CopyFrom(CopyState cstate)
         * no additional work to enforce that.
         *----------
         */
+       /* createSubid is creation check, newRelfilenodeSubid is truncation check */
        if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
                cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
        {
@@ -1910,29 +2402,48 @@ CopyFrom(CopyState cstate)
                        hi_options |= HEAP_INSERT_SKIP_WAL;
        }
 
+       /*
+        * Optimize if new relfilenode was created in this subxact or one of its
+        * committed children and we won't see those rows later as part of an
+        * earlier scan or command. This ensures that if this subtransaction
+        * aborts then the frozen rows won't be visible after xact cleanup. Note
+        * that the stronger test of exactly which subtransaction created it is
+        * crucial for correctness of this optimization.
+        */
+       if (cstate->freeze)
+       {
+               if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+                                        errmsg("cannot perform FREEZE because of prior transaction activity")));
+
+               if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
+                cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
+
+               hi_options |= HEAP_INSERT_FROZEN;
+       }
+
        /*
         * We need a ResultRelInfo so we can use the regular executor's
         * index-entry-making machinery.  (There used to be a huge amount of code
         * here that basically duplicated execUtils.c ...)
         */
        resultRelInfo = makeNode(ResultRelInfo);
-       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
-       resultRelInfo->ri_RelationDesc = cstate->rel;
-       resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
-       if (resultRelInfo->ri_TrigDesc)
-       {
-               resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
-                       palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
-               resultRelInfo->ri_TrigWhenExprs = (List **)
-                       palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(List *));
-       }
-       resultRelInfo->ri_TrigInstrument = NULL;
+       InitResultRelInfo(resultRelInfo,
+                                         cstate->rel,
+                                         1,            /* dummy rangetable index */
+                                         NULL,
+                                         0);
 
-       ExecOpenIndices(resultRelInfo);
+       ExecOpenIndices(resultRelInfo, false);
 
        estate->es_result_relations = resultRelInfo;
        estate->es_num_result_relations = 1;
        estate->es_result_relation_info = resultRelInfo;
+       estate->es_range_table = cstate->range_table;
 
        /* Set up a tuple slot too */
        myslot = ExecInitExtraTupleSlot(estate);
@@ -1940,11 +2451,35 @@ CopyFrom(CopyState cstate)
        /* Triggers might need a slot as well */
        estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
 
+       /*
+        * It's more efficient to prepare a bunch of tuples for insertion, and
+        * insert them in one heap_multi_insert() call, than call heap_insert()
+        * separately for every tuple. However, we can't do that if there are
+        * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+        * expressions. Such triggers or expressions might query the table we're
+        * inserting to, and act differently if the tuples that have already been
+        * processed and prepared for insertion are not there.  We also can't do
+        * it if the table is partitioned.
+        */
+       if ((resultRelInfo->ri_TrigDesc != NULL &&
+                (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+                 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+               cstate->partition_dispatch_info != NULL ||
+               cstate->volatile_defexprs)
+       {
+               useHeapMultiInsert = false;
+       }
+       else
+       {
+               useHeapMultiInsert = true;
+               bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+       }
+
        /* Prepare to catch AFTER triggers. */
        AfterTriggerBeginQuery();
 
        /*
-        * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
+        * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
         * should do this for COPY, since it's not really an "INSERT" statement as
         * such. However, executing these triggers maintains consistency with the
         * EACH ROW triggers that we already fire on COPY.
@@ -1958,10 +2493,10 @@ CopyFrom(CopyState cstate)
        econtext = GetPerTupleExprContext(estate);
 
        /* Set up callback to identify error line number */
-       errcontext.callback = CopyFromErrorCallback;
-       errcontext.arg = (void *) cstate;
-       errcontext.previous = error_context_stack;
-       error_context_stack = &errcontext;
+       errcallback.callback = CopyFromErrorCallback;
+       errcallback.arg = (void *) cstate;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
 
        for (;;)
        {
@@ -1971,8 +2506,15 @@ CopyFrom(CopyState cstate)
 
                CHECK_FOR_INTERRUPTS();
 
-               /* Reset the per-tuple exprcontext */
-               ResetPerTupleExprContext(estate);
+               if (nBufferedTuples == 0)
+               {
+                       /*
+                        * Reset the per-tuple exprcontext. We can only do this if the
+                        * tuple buffer is empty. (Calling the context the per-tuple
+                        * memory context is a bit of a misnomer now.)
+                        */
+                       ResetPerTupleExprContext(estate);
+               }
 
                /* Switch into its memory context */
                MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -1986,6 +2528,12 @@ CopyFrom(CopyState cstate)
                if (loaded_oid != InvalidOid)
                        HeapTupleSetOid(tuple, loaded_oid);
 
+               /*
+                * Constraints might reference the tableoid column, so initialize
+                * t_tableOid before evaluating them.
+                */
+               tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
                /* Triggers and stuff need to be invoked in query context. */
                MemoryContextSwitchTo(oldcontext);
 
@@ -1993,6 +2541,81 @@ CopyFrom(CopyState cstate)
                slot = myslot;
                ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
+               /* Determine the partition to heap_insert the tuple into */
+               if (cstate->partition_dispatch_info)
+               {
+                       int                     leaf_part_index;
+                       TupleConversionMap *map;
+
+                       /*
+                        * Away we go ... If we end up not finding a partition after all,
+                        * ExecFindPartition() does not return and errors out instead.
+                        * Otherwise, the returned value is to be used as an index into
+                        * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+                        * will get us the ResultRelInfo and TupleConversionMap for the
+                        * partition, respectively.
+                        */
+                       leaf_part_index = ExecFindPartition(resultRelInfo,
+                                                                                        cstate->partition_dispatch_info,
+                                                                                               slot,
+                                                                                               estate);
+                       Assert(leaf_part_index >= 0 &&
+                                  leaf_part_index < cstate->num_partitions);
+
+                       /*
+                        * If this tuple is mapped to a partition that is not same as the
+                        * previous one, we'd better make the bulk insert mechanism gets a
+                        * new buffer.
+                        */
+                       if (prev_leaf_part_index != leaf_part_index)
+                       {
+                               ReleaseBulkInsertStatePin(bistate);
+                               prev_leaf_part_index = leaf_part_index;
+                       }
+
+                       /*
+                        * Save the old ResultRelInfo and switch to the one corresponding
+                        * to the selected partition.
+                        */
+                       saved_resultRelInfo = resultRelInfo;
+                       resultRelInfo = cstate->partitions + leaf_part_index;
+
+                       /* We do not yet have a way to insert into a foreign partition */
+                       if (resultRelInfo->ri_FdwRoutine)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot route inserted tuples to a foreign table")));
+
+                       /*
+                        * For ExecInsertIndexTuples() to work on the partition's indexes
+                        */
+                       estate->es_result_relation_info = resultRelInfo;
+
+                       /*
+                        * We might need to convert from the parent rowtype to the
+                        * partition rowtype.
+                        */
+                       map = cstate->partition_tupconv_maps[leaf_part_index];
+                       if (map)
+                       {
+                               Relation        partrel = resultRelInfo->ri_RelationDesc;
+
+                               tuple = do_convert_tuple(tuple, map);
+
+                               /*
+                                * We must use the partition's tuple descriptor from this
+                                * point on.  Use a dedicated slot from this point on until
+                                * we're finished dealing with the partition.
+                                */
+                               slot = cstate->partition_tuple_slot;
+                               Assert(slot != NULL);
+                               ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+                               ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+                       }
+
+                       tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+               }
+
                skip_tuple = false;
 
                /* BEFORE ROW INSERT Triggers */
@@ -2009,24 +2632,67 @@ CopyFrom(CopyState cstate)
 
                if (!skip_tuple)
                {
-                       List       *recheckIndexes = NIL;
+                       if (resultRelInfo->ri_TrigDesc &&
+                               resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+                       {
+                               /* Pass the data to the INSTEAD ROW INSERT trigger */
+                               ExecIRInsertTriggers(estate, resultRelInfo, slot);
+                       }
+                       else
+                       {
+                               /* Check the constraints of the tuple */
+                               if (cstate->rel->rd_att->constr ||
+                                       resultRelInfo->ri_PartitionCheck)
+                                       ExecConstraints(resultRelInfo, slot, estate);
 
-                       /* Check the constraints of the tuple */
-                       if (cstate->rel->rd_att->constr)
-                               ExecConstraints(resultRelInfo, slot, estate);
+                               if (useHeapMultiInsert)
+                               {
+                                       /* Add this tuple to the tuple buffer */
+                                       if (nBufferedTuples == 0)
+                                               firstBufferedLineNo = cstate->cur_lineno;
+                                       bufferedTuples[nBufferedTuples++] = tuple;
+                                       bufferedTuplesSize += tuple->t_len;
 
-                       /* OK, store the tuple and create index entries for it */
-                       heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+                                       /*
+                                        * If the buffer filled up, flush it.  Also flush if the
+                                        * total size of all the tuples in the buffer becomes
+                                        * large, to avoid using large amounts of memory for the
+                                        * buffer when the tuples are exceptionally wide.
+                                        */
+                                       if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+                                               bufferedTuplesSize > 65535)
+                                       {
+                                               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                                                                       resultRelInfo, myslot, bistate,
+                                                                                       nBufferedTuples, bufferedTuples,
+                                                                                       firstBufferedLineNo);
+                                               nBufferedTuples = 0;
+                                               bufferedTuplesSize = 0;
+                                       }
+                               }
+                               else
+                               {
+                                       List       *recheckIndexes = NIL;
 
-                       if (resultRelInfo->ri_NumIndices > 0)
-                               recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-                                                                                                          estate);
+                                       /* OK, store the tuple and create index entries for it */
+                                       heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+                                                               hi_options, bistate);
 
-                       /* AFTER ROW INSERT Triggers */
-                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
-                                                                recheckIndexes);
+                                       if (resultRelInfo->ri_NumIndices > 0)
+                                               recheckIndexes = ExecInsertIndexTuples(slot,
+                                                                                                                       &(tuple->t_self),
+                                                                                                                          estate,
+                                                                                                                          false,
+                                                                                                                          NULL,
+                                                                                                                          NIL);
 
-                       list_free(recheckIndexes);
+                                       /* AFTER ROW INSERT Triggers */
+                                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
+                                                                                recheckIndexes);
+
+                                       list_free(recheckIndexes);
+                               }
+                       }
 
                        /*
                         * We count only tuples not suppressed by a BEFORE INSERT trigger;
@@ -2034,16 +2700,36 @@ CopyFrom(CopyState cstate)
                         * tuples inserted by an INSERT command.
                         */
                        processed++;
+
+                       if (saved_resultRelInfo)
+                       {
+                               resultRelInfo = saved_resultRelInfo;
+                               estate->es_result_relation_info = resultRelInfo;
+                       }
                }
        }
 
+       /* Flush any remaining buffered tuples */
+       if (nBufferedTuples > 0)
+               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                                       resultRelInfo, myslot, bistate,
+                                                       nBufferedTuples, bufferedTuples,
+                                                       firstBufferedLineNo);
+
        /* Done, clean up */
-       error_context_stack = errcontext.previous;
+       error_context_stack = errcallback.previous;
 
        FreeBulkInsertState(bistate);
 
        MemoryContextSwitchTo(oldcontext);
 
+       /*
+        * In the old protocol, tell pqcomm that we can process normal protocol
+        * messages again.
+        */
+       if (cstate->copy_dest == COPY_OLD_FE)
+               pq_endmsgread();
+
        /* Execute AFTER STATEMENT insertion triggers */
        ExecASInsertTriggers(estate, resultRelInfo);
 
@@ -2057,6 +2743,39 @@ CopyFrom(CopyState cstate)
 
        ExecCloseIndices(resultRelInfo);
 
+       /* Close all the partitioned tables, leaf partitions, and their indices */
+       if (cstate->partition_dispatch_info)
+       {
+               int                     i;
+
+               /*
+                * Remember cstate->partition_dispatch_info[0] corresponds to the root
+                * partitioned table, which we must not try to close, because it is
+                * the main target table of COPY that will be closed eventually by
+                * DoCopy().  Also, tupslot is NULL for the root partitioned table.
+                */
+               for (i = 1; i < cstate->num_dispatch; i++)
+               {
+                       PartitionDispatch pd = cstate->partition_dispatch_info[i];
+
+                       heap_close(pd->reldesc, NoLock);
+                       ExecDropSingleTupleTableSlot(pd->tupslot);
+               }
+               for (i = 0; i < cstate->num_partitions; i++)
+               {
+                       ResultRelInfo *resultRelInfo = cstate->partitions + i;
+
+                       ExecCloseIndices(resultRelInfo);
+                       heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+               }
+
+               /* Release the standalone partition tuple descriptor */
+               ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
+       }
+
+       /* Close any trigger target relations */
+       ExecCleanUpTriggerState(estate);
+
        FreeExecutorState(estate);
 
        /*
@@ -2069,6 +2788,84 @@ CopyFrom(CopyState cstate)
        return processed;
 }
 
+/*
+ * A subroutine of CopyFrom, to write the current batch of buffered heap
+ * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
+ * triggers.
+ */
+static void
+CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
+                                       int hi_options, ResultRelInfo *resultRelInfo,
+                                       TupleTableSlot *myslot, BulkInsertState bistate,
+                                       int nBufferedTuples, HeapTuple *bufferedTuples,
+                                       int firstBufferedLineNo)
+{
+       MemoryContext oldcontext;
+       int                     i;
+       int                     save_cur_lineno;
+
+       /*
+        * Print error context information correctly, if one of the operations
+        * below fail.
+        */
+       cstate->line_buf_valid = false;
+       save_cur_lineno = cstate->cur_lineno;
+
+       /*
+        * heap_multi_insert leaks memory, so switch to short-lived memory context
+        * before calling it.
+        */
+       oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+       heap_multi_insert(cstate->rel,
+                                         bufferedTuples,
+                                         nBufferedTuples,
+                                         mycid,
+                                         hi_options,
+                                         bistate);
+       MemoryContextSwitchTo(oldcontext);
+
+       /*
+        * If there are any indexes, update them for all the inserted tuples, and
+        * run AFTER ROW INSERT triggers.
+        */
+       if (resultRelInfo->ri_NumIndices > 0)
+       {
+               for (i = 0; i < nBufferedTuples; i++)
+               {
+                       List       *recheckIndexes;
+
+                       cstate->cur_lineno = firstBufferedLineNo + i;
+                       ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+                       recheckIndexes =
+                               ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+                                                                         estate, false, NULL, NIL);
+                       ExecARInsertTriggers(estate, resultRelInfo,
+                                                                bufferedTuples[i],
+                                                                recheckIndexes);
+                       list_free(recheckIndexes);
+               }
+       }
+
+       /*
+        * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+        * anyway.
+        */
+       else if (resultRelInfo->ri_TrigDesc != NULL &&
+                        resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+       {
+               for (i = 0; i < nBufferedTuples; i++)
+               {
+                       cstate->cur_lineno = firstBufferedLineNo + i;
+                       ExecARInsertTriggers(estate, resultRelInfo,
+                                                                bufferedTuples[i],
+                                                                NIL);
+               }
+       }
+
+       /* reset cur_lineno to where we were */
+       cstate->cur_lineno = save_cur_lineno;
+}
+
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -2080,8 +2877,11 @@ CopyFrom(CopyState cstate)
  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
  */
 CopyState
-BeginCopyFrom(Relation rel,
+BeginCopyFrom(ParseState *pstate,
+                         Relation rel,
                          const char *filename,
+                         bool is_program,
+                         copy_data_source_cb data_source_cb,
                          List *attnamelist,
                          List *options)
 {
@@ -2098,8 +2898,9 @@ BeginCopyFrom(Relation rel,
        int                *defmap;
        ExprState **defexprs;
        MemoryContext oldcontext;
+       bool            volatile_defexprs;
 
-       cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
+       cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
        /* Initialize state variables */
@@ -2117,10 +2918,15 @@ BeginCopyFrom(Relation rel,
        cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
        cstate->raw_buf_index = cstate->raw_buf_len = 0;
 
+       /* Assign range table, we'll need it in CopyFrom. */
+       if (pstate)
+               cstate->range_table = pstate->p_rtable;
+
        tupDesc = RelationGetDescr(cstate->rel);
        attr = tupDesc->attrs;
        num_phys_attrs = tupDesc->natts;
        num_defaults = 0;
+       volatile_defexprs = false;
 
        /*
         * Pick up the required catalog information for each attribute in the
@@ -2153,15 +2959,34 @@ BeginCopyFrom(Relation rel,
                {
                        /* attribute is NOT to be copied from input */
                        /* use default value if one exists */
-                       Node       *defexpr = build_column_default(cstate->rel, attnum);
+                       Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
+                                                                                                                               attnum);
 
                        if (defexpr != NULL)
                        {
-                               /* Initialize expressions in copycontext. */
-                               defexprs[num_defaults] = ExecInitExpr(
-                                                                expression_planner((Expr *) defexpr), NULL);
+                               /* Run the expression through planner */
+                               defexpr = expression_planner(defexpr);
+
+                               /* Initialize executable expression in copycontext */
+                               defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
                                defmap[num_defaults] = attnum - 1;
                                num_defaults++;
+
+                               /*
+                                * If a default expression looks at the table being loaded,
+                                * then it could give the wrong answer when using
+                                * multi-insert. Since database access can be dynamic this is
+                                * hard to test for exactly, so we use the much wider test of
+                                * whether the default expression is volatile. We allow for
+                                * the special case of when the default expression is the
+                                * nextval() of a sequence which in this specific case is
+                                * known to be safe for use with the multi-insert
+                                * optimization. Hence we use this special case function
+                                * checker rather than the standard check for
+                                * contain_volatile_functions().
+                                */
+                               if (!volatile_defexprs)
+                                       volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
                        }
                }
        }
@@ -2171,10 +2996,18 @@ BeginCopyFrom(Relation rel,
        cstate->typioparams = typioparams;
        cstate->defmap = defmap;
        cstate->defexprs = defexprs;
+       cstate->volatile_defexprs = volatile_defexprs;
        cstate->num_defaults = num_defaults;
+       cstate->is_program = is_program;
 
-       if (pipe)
+       if (data_source_cb)
        {
+               cstate->copy_dest = COPY_CALLBACK;
+               cstate->data_source_cb = data_source_cb;
+       }
+       else if (pipe)
+       {
+               Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput == DestRemote)
                        ReceiveCopyBegin(cstate);
                else
@@ -2182,22 +3015,47 @@ BeginCopyFrom(Relation rel,
        }
        else
        {
-               struct stat st;
-
                cstate->filename = pstrdup(filename);
-               cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
 
-               if (cstate->copy_file == NULL)
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not open file \"%s\" for reading: %m",
-                                                       cstate->filename)));
+               if (cstate->is_program)
+               {
+                       cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
+                       if (cstate->copy_file == NULL)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not execute command \"%s\": %m",
+                                                               cstate->filename)));
+               }
+               else
+               {
+                       struct stat st;
 
-               fstat(fileno(cstate->copy_file), &st);
-               if (S_ISDIR(st.st_mode))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                        errmsg("\"%s\" is a directory", cstate->filename)));
+                       cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+                       if (cstate->copy_file == NULL)
+                       {
+                               /* copy errno because ereport subfunctions might change it */
+                               int                     save_errno = errno;
+
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not open file \"%s\" for reading: %m",
+                                                               cstate->filename),
+                                                (save_errno == ENOENT || save_errno == EACCES) ?
+                                                errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
+                                                                "You may want a client-side facility such as psql's \\copy.") : 0));
+                       }
+
+                       if (fstat(fileno(cstate->copy_file), &st))
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not stat file \"%s\": %m",
+                                                               cstate->filename)));
+
+                       if (S_ISDIR(st.st_mode))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                                errmsg("\"%s\" is a directory", cstate->filename)));
+               }
        }
 
        if (!cstate->binary)
@@ -2418,11 +3276,35 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                                                                NameStr(attr[m]->attname))));
                        string = field_strings[fieldno++];
 
-                       if (cstate->csv_mode && string == NULL &&
-                               cstate->force_notnull_flags[m])
+                       if (cstate->convert_select_flags &&
+                               !cstate->convert_select_flags[m])
+                       {
+                               /* ignore input field, leaving column as NULL */
+                               continue;
+                       }
+
+                       if (cstate->csv_mode)
                        {
-                               /* Go ahead and read the NULL string */
-                               string = cstate->null_print;
+                               if (string == NULL &&
+                                       cstate->force_notnull_flags[m])
+                               {
+                                       /*
+                                        * FORCE_NOT_NULL option is set and column is NULL -
+                                        * convert it to the NULL string.
+                                        */
+                                       string = cstate->null_print;
+                               }
+                               else if (string != NULL && cstate->force_null_flags[m]
+                                                && strcmp(string, cstate->null_print) == 0)
+                               {
+                                       /*
+                                        * FORCE_NULL option is set and column matches the NULL
+                                        * string. It must have been quoted, or otherwise the
+                                        * string would already have been set to NULL. Convert it
+                                        * to NULL as specified.
+                                        */
+                                       string = NULL;
+                               }
                        }
 
                        cstate->cur_attname = NameStr(attr[m]->attname);
@@ -2462,7 +3344,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                         * if client chooses to send that now.
                         *
                         * Note that we MUST NOT try to read more data in an old-protocol
-                        * copy, since there is no protocol-level EOF marker then.      We
+                        * copy, since there is no protocol-level EOF marker then.  We
                         * could go either way for copy from file, but choose to throw
                         * error if there's data after the EOF marker, for consistency
                         * with the new-protocol case.
@@ -2524,7 +3406,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
        /*
         * Now compute and insert any defaults available for the columns not
-        * provided by the input data.  Anything not processed here or above will
+        * provided by the input data.  Anything not processed here or above will
         * remain NULL.
         */
        for (i = 0; i < num_defaults; i++)
@@ -2537,7 +3419,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
                values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
-                                                                                &nulls[defmap[i]], NULL);
+                                                                                &nulls[defmap[i]]);
        }
 
        return true;
@@ -2559,7 +3441,7 @@ EndCopyFrom(CopyState cstate)
  * server encoding.
  *
  * Result is true if read was terminated by EOF, false if terminated
- * by newline. The terminating newline or EOF marker is not included
+ * by newline.  The terminating newline or EOF marker is not included
  * in the final value of line_buf.
  */
 static bool
@@ -2568,6 +3450,7 @@ CopyReadLine(CopyState cstate)
        bool            result;
 
        resetStringInfo(&cstate->line_buf);
+       cstate->line_buf_valid = true;
 
        /* Mark that encoding conversion hasn't occurred yet */
        cstate->line_buf_converted = false;
@@ -2714,7 +3597,7 @@ CopyReadLineText(CopyState cstate)
                 * of read-ahead and avoid the many calls to
                 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
                 * does not allow us to read too far ahead or we might read into the
-                * next data, so we read-ahead only as far we know we can.      One
+                * next data, so we read-ahead only as far we know we can.  One
                 * optimization would be to read-ahead four byte here if
                 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
                 * considering the size of the buffer.
@@ -2724,7 +3607,7 @@ CopyReadLineText(CopyState cstate)
                        REFILL_LINEBUF;
 
                        /*
-                        * Try to read some more data.  This will certainly reset
+                        * Try to read some more data.  This will certainly reset
                         * raw_buf_index to zero, and raw_buf_ptr must go with it.
                         */
                        if (!CopyLoadRawBuf(cstate))
@@ -2770,7 +3653,7 @@ CopyReadLineText(CopyState cstate)
                         * just use the char as a toggle. If they are different, we need
                         * to ensure that we only take account of an escape inside a
                         * quoted field and immediately preceding a quote char, and not
-                        * the second in a escape-escape sequence.
+                        * the second in an escape-escape sequence.
                         */
                        if (in_quote && c == escapec)
                                last_was_esc = !last_was_esc;
@@ -2782,7 +3665,7 @@ CopyReadLineText(CopyState cstate)
                        /*
                         * Updating the line count for embedded CR and/or LF chars is
                         * necessarily a little fragile - this test is probably about the
-                        * best we can do.      (XXX it's arguable whether we should do this
+                        * best we can do.  (XXX it's arguable whether we should do this
                         * at all --- is cur_lineno a physical or logical count?)
                         */
                        if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
@@ -2961,7 +3844,7 @@ CopyReadLineText(CopyState cstate)
                                 * after a backslash is special, so we skip over that second
                                 * character too.  If we didn't do that \\. would be
                                 * considered an eof-of copy, while in non-CSV mode it is a
-                                * literal backslash followed by a period.      In CSV mode,
+                                * literal backslash followed by a period.  In CSV mode,
                                 * backslashes are not special, so we want to process the
                                 * character after the backslash just like a normal character,
                                 * so we don't increment in those cases.
@@ -3064,7 +3947,7 @@ CopyReadAttributesText(CopyState cstate)
        /*
         * The de-escaped attributes will certainly not be longer than the input
         * data line, so we can just force attribute_buf to be large enough and
-        * then transfer data without any checks for enough space.      We need to do
+        * then transfer data without any checks for enough space.  We need to do
         * it this way because enlarging attribute_buf mid-stream would invalidate
         * pointers already stored into cstate->raw_fields[].
         */
@@ -3098,7 +3981,17 @@ CopyReadAttributesText(CopyState cstate)
                start_ptr = cur_ptr;
                cstate->raw_fields[fieldno] = output_ptr;
 
-               /* Scan data for field */
+               /*
+                * Scan data for field.
+                *
+                * Note that in this loop, we are scanning to locate the end of field
+                * and also speculatively performing de-escaping.  Once we find the
+                * end-of-field, we can match the raw field contents against the null
+                * marker string.  Only after that comparison fails do we know that
+                * de-escaping is actually the right thing to do; therefore we *must
+                * not* throw any syntax errors before we've done the null-marker
+                * check.
+                */
                for (;;)
                {
                        char            c;
@@ -3211,26 +4104,29 @@ CopyReadAttributesText(CopyState cstate)
                        *output_ptr++ = c;
                }
 
-               /* Terminate attribute value in output area */
-               *output_ptr++ = '\0';
-
-               /*
-                * If we de-escaped a non-7-bit-ASCII char, make sure we still have
-                * valid data for the db encoding. Avoid calling strlen here for the
-                * sake of efficiency.
-                */
-               if (saw_non_ascii)
-               {
-                       char       *fld = cstate->raw_fields[fieldno];
-
-                       pg_verifymbstr(fld, output_ptr - (fld + 1), false);
-               }
-
                /* Check whether raw input matched null marker */
                input_len = end_ptr - start_ptr;
                if (input_len == cstate->null_print_len &&
                        strncmp(start_ptr, cstate->null_print, input_len) == 0)
                        cstate->raw_fields[fieldno] = NULL;
+               else
+               {
+                       /*
+                        * At this point we know the field is supposed to contain data.
+                        *
+                        * If we de-escaped any non-7-bit-ASCII chars, make sure the
+                        * resulting string is valid data for the db encoding.
+                        */
+                       if (saw_non_ascii)
+                       {
+                               char       *fld = cstate->raw_fields[fieldno];
+
+                               pg_verifymbstr(fld, output_ptr - fld, false);
+                       }
+               }
+
+               /* Terminate attribute value in output area */
+               *output_ptr++ = '\0';
 
                fieldno++;
                /* Done if we hit EOL instead of a delim */
@@ -3281,7 +4177,7 @@ CopyReadAttributesCSV(CopyState cstate)
        /*
         * The de-escaped attributes will certainly not be longer than the input
         * data line, so we can just force attribute_buf to be large enough and
-        * then transfer data without any checks for enough space.      We need to do
+        * then transfer data without any checks for enough space.  We need to do
         * it this way because enlarging attribute_buf mid-stream would invalidate
         * pointers already stored into cstate->raw_fields[].
         */
@@ -3496,7 +4392,7 @@ CopyAttributeOutText(CopyState cstate, char *string)
        /*
         * We have to grovel through the string searching for control characters
         * and instances of the delimiter character.  In most cases, though, these
-        * are infrequent.      To avoid overhead from calling CopySendData once per
+        * are infrequent.  To avoid overhead from calling CopySendData once per
         * character, we dump out all characters between escaped characters in a
         * single call.  The loop invariant is that the data from "start" to "ptr"
         * can be sent literally, but hasn't yet been.
@@ -3805,7 +4701,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * copy_dest_receive --- receive one tuple
  */
-static void
+static bool
 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 {
        DR_copy    *myState = (DR_copy *) self;
@@ -3817,6 +4713,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
        /* And send the data */
        CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
        myState->processed++;
+
+       return true;
 }
 
 /*