X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcommands%2Fcopy.c;h=84b1a54cb9b4ed81015ef96a30f7c01179750d99;hb=a6fd7b7a5f7bf3a8aa3f3d076cf09d922c1c6dd2;hp=57429035e895e5b74f158e005b726ea3d40915ce;hpb=bf50caf105a901c4f83ac1df3cdaf910c26694a4;p=postgresql diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 57429035e8..84b1a54cb9 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -3,7 +3,7 @@ * copy.c * Implements the COPY utility command * - * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * @@ -21,9 +21,10 @@ #include #include "access/heapam.h" +#include "access/htup_details.h" #include "access/sysattr.h" #include "access/xact.h" -#include "catalog/namespace.h" +#include "access/xlog.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" @@ -33,15 +34,19 @@ #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "optimizer/planner.h" +#include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" -#include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/portal.h" +#include "utils/rel.h" +#include "utils/rls.h" #include "utils/snapmgr.h" @@ -54,9 +59,10 @@ */ typedef enum CopyDest { - COPY_FILE, /* to/from file */ + COPY_FILE, /* to/from file (or a piped program) */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE /* to/from frontend (3.0 protocol) */ + COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ + COPY_CALLBACK /* to/from callback function */ } CopyDest; /* @@ -104,8 +110,11 @@ typedef struct CopyStateData QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ + bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ bool header_line; /* CSV header line? */ char *null_print; /* NULL marker string (server encoding!) */ @@ -115,10 +124,15 @@ typedef struct CopyStateData char *quote; /* CSV quote char (must be 1 byte) */ char *escape; /* CSV escape char (must be 1 byte) */ List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE QUOTE *? */ + bool force_quote_all; /* FORCE_QUOTE *? */ bool *force_quote_flags; /* per-column CSV FQ flags */ List *force_notnull; /* list of column names */ bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + List *convert_select; /* list of column names (can be NIL) */ + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -148,6 +162,15 @@ typedef struct CopyStateData Oid *typioparams; /* array of element types for in_functions */ int *defmap; /* array of default att numbers */ ExprState **defexprs; /* array of default att expressions */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; + + PartitionDispatch *partition_dispatch_info; + int num_dispatch; /* Number of entries in the above array */ + int num_partitions; /* Number of members in the following arrays */ + ResultRelInfo *partitions; /* Per partition result relation */ + TupleConversionMap **partition_tupconv_maps; + TupleTableSlot *partition_tuple_slot; /* * These variables are used to reduce overhead in textual COPY FROM. @@ -173,10 +196,11 @@ typedef struct CopyStateData */ StringInfoData line_buf; bool line_buf_converted; /* converted to server encoding? */ + bool line_buf_valid; /* contains the row being processed? */ /* * Finally, raw_buf holds raw data read from the data source (file or - * client connection). CopyReadLine parses this data sufficiently to + * client connection). CopyReadLine parses this data sufficiently to * locate line boundaries, then transfers the data to line_buf and * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. @@ -187,7 +211,7 @@ typedef struct CopyStateData int raw_buf_len; /* total # of bytes stored */ } CopyStateData; -/* DestReceiver for COPY (SELECT) TO */ +/* DestReceiver for COPY (query) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ @@ -202,7 +226,7 @@ typedef struct * function call overhead in tight COPY loops. * * We must use "if (1)" because the usual "do {...} while(0)" wrapper would - * prevent the continue/break processing from working. We end the "if (1)" + * prevent the continue/break processing from working. We end the "if (1)" * with "else ((void) 0)" to ensure the "if" does not unintentionally match * any "else" in the calling code, and to avoid any compiler warnings about * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros. @@ -265,17 +289,25 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query, - const char *queryString, List *attnamelist, List *options); +static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, + RawStmt *raw_query, Oid queryRelId, List *attnamelist, + List *options); static void EndCopy(CopyState cstate); -static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString, - const char *filename, List *attnamelist, List *options); +static void ClosePipeToProgram(CopyState cstate); +static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, + Oid queryRelId, const char *filename, bool is_program, + List *attnamelist, List *options); static void EndCopyTo(CopyState cstate); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); -static uint64 CopyFrom(CopyState cstate); +static void CopyFromInsertBatch(CopyState cstate, EState *estate, + CommandId mycid, int hi_options, + ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, + BulkInsertState bistate, + int nBufferedTuples, HeapTuple *bufferedTuples, + int firstBufferedLineNo); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); @@ -295,7 +327,7 @@ static char *limit_printout_length(const char *str); static void SendCopyBegin(CopyState cstate); static void ReceiveCopyBegin(CopyState cstate); static void SendCopyEnd(CopyState cstate); -static void CopySendData(CopyState cstate, void *databuf, int datasize); +static void CopySendData(CopyState cstate, const void *databuf, int datasize); static void CopySendString(CopyState cstate, const char *str); static void CopySendChar(CopyState cstate, char c); static void CopySendEndOfRow(CopyState cstate); @@ -330,7 +362,7 @@ SendCopyBegin(CopyState cstate) pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; } - else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) + else { /* old way */ if (cstate->binary) @@ -342,18 +374,6 @@ SendCopyBegin(CopyState cstate) pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; } - else - { - /* very old way */ - if (cstate->binary) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY BINARY is not supported to stdout or from stdin"))); - pq_putemptymessage('B'); - /* grottiness needed for old COPY OUT protocol */ - pq_startcopyout(); - cstate->copy_dest = COPY_OLD_FE; - } } static void @@ -376,7 +396,7 @@ ReceiveCopyBegin(CopyState cstate) cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } - else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2) + else { /* old way */ if (cstate->binary) @@ -384,16 +404,8 @@ ReceiveCopyBegin(CopyState cstate) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); - cstate->copy_dest = COPY_OLD_FE; - } - else - { - /* very old way */ - if (cstate->binary) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY BINARY is not supported to stdout or from stdin"))); - pq_putemptymessage('D'); + /* any error in old protocol will make us lose sync */ + pq_startmsgread(); cstate->copy_dest = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ @@ -430,9 +442,9 @@ SendCopyEnd(CopyState cstate) *---------- */ static void -CopySendData(CopyState cstate, void *databuf, int datasize) +CopySendData(CopyState cstate, const void *databuf, int datasize) { - appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize); + appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); } static void @@ -465,12 +477,38 @@ CopySendEndOfRow(CopyState cstate) #endif } - (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, - 1, cstate->copy_file); - if (ferror(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to COPY file: %m"))); + if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, + cstate->copy_file) != 1 || + ferror(cstate->copy_file)) + { + if (cstate->is_program) + { + if (errno == EPIPE) + { + /* + * The pipe will be closed automatically on error at + * the end of transaction, but we might get a better + * error message from the subprocess' exit code than + * just "Broken Pipe" + */ + ClosePipeToProgram(cstate); + + /* + * If ClosePipeToProgram() didn't throw an error, the + * program terminated normally, but closed the pipe + * first. Restore errno, and throw an error. + */ + errno = EPIPE; + } + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to COPY program: %m"))); + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to COPY file: %m"))); + } break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ @@ -493,6 +531,9 @@ CopySendEndOfRow(CopyState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_CALLBACK: + Assert(false); /* Not yet supported. */ + break; } resetStringInfo(fe_msgbuf); @@ -502,7 +543,7 @@ CopySendEndOfRow(CopyState cstate) * CopyGetData reads data from the source (file or frontend) * * We attempt to read at least minread, and at most maxread, bytes from - * the source. The actual number of bytes read is returned; if this is + * the source. The actual number of bytes read is returned; if this is * less than minread, EOF was detected. * * Note: when copying from the frontend, we expect a proper EOF mark per @@ -539,7 +580,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection"))); + errmsg("unexpected EOF on client connection with an open transaction"))); } bytesread = minread; break; @@ -554,15 +595,18 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) int mtype; readmessage: + HOLD_CANCEL_INTERRUPTS(); + pq_startmsgread(); mtype = pq_getbyte(); if (mtype == EOF) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection"))); + errmsg("unexpected EOF on client connection with an open transaction"))); if (pq_getmessage(cstate->fe_msgbuf, 0)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on client connection"))); + errmsg("unexpected EOF on client connection with an open transaction"))); + RESUME_CANCEL_INTERRUPTS(); switch (mtype) { case 'd': /* CopyData */ @@ -604,6 +648,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; + case COPY_CALLBACK: + bytesread = cstate->data_source_cb(databuf, minread, maxread); + break; } return bytesread; @@ -716,10 +763,11 @@ CopyLoadRawBuf(CopyState cstate) * * Either unload or reload contents of table , depending on . * ( = TRUE means we are inserting into the table.) In the "TO" case - * we also support copying the output of an arbitrary SELECT query. + * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE + * or DELETE query. * * If is false, transfer is between the table and the file named - * . Otherwise, transfer is between the table and our regular + * . Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * @@ -729,30 +777,41 @@ CopyLoadRawBuf(CopyState cstate) * Do not allow the copy if user doesn't have proper permission to access * the table or the specifically requested columns. */ -uint64 -DoCopy(const CopyStmt *stmt, const char *queryString) +void +DoCopy(ParseState *pstate, const CopyStmt *stmt, + int stmt_location, int stmt_len, + uint64 *processed) { CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); Relation rel; - uint64 processed; + Oid relid; + RawStmt *query = NULL; - /* Disallow file COPY except to superusers. */ + /* Disallow COPY to/from file or program except to superusers. */ if (!pipe && !superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to COPY to or from a file"), - errhint("Anyone can COPY to stdout or from stdin. " - "psql's \\copy command also works for anyone."))); + { + if (stmt->is_program) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from an external program"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + else + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to COPY to or from a file"), + errhint("Anyone can COPY to stdout or from stdin. " + "psql's \\copy command also works for anyone."))); + } if (stmt->relation) { TupleDesc tupDesc; - AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); - RangeTblEntry *rte; List *attnums; ListCell *cur; + RangeTblEntry *rte; Assert(!stmt->query); @@ -760,11 +819,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString) rel = heap_openrv(stmt->relation, (is_from ? RowExclusiveLock : AccessShareLock)); - rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(rel); - rte->relkind = rel->rd_rel->relkind; - rte->requiredPerms = required_access; + relid = RelationGetRelid(rel); + + rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false); + rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT); tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); @@ -774,35 +832,155 @@ DoCopy(const CopyStmt *stmt, const char *queryString) FirstLowInvalidHeapAttributeNumber; if (is_from) - rte->modifiedCols = bms_add_member(rte->modifiedCols, attno); + rte->insertedCols = bms_add_member(rte->insertedCols, attno); else rte->selectedCols = bms_add_member(rte->selectedCols, attno); } - ExecCheckRTPerms(list_make1(rte), true); + ExecCheckRTPerms(pstate->p_rtable, true); + + /* + * Permission check for row security policies. + * + * check_enable_rls will ereport(ERROR) if the user has requested + * something invalid and will otherwise indicate if we should enable + * RLS (returns RLS_ENABLED) or not for this COPY statement. + * + * If the relation has a row security policy and we are to apply it + * then perform a "query" copy and allow the normal query processing + * to handle the policies. + * + * If RLS is not enabled for this, then just fall through to the + * normal non-filtering relation handling. + */ + if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED) + { + SelectStmt *select; + ColumnRef *cr; + ResTarget *target; + RangeVar *from; + List *targetList = NIL; + + if (is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY FROM not supported with row-level security"), + errhint("Use INSERT statements instead."))); + + /* + * Build target list + * + * If no columns are specified in the attribute list of the COPY + * command, then the target list is 'all' columns. Therefore, '*' + * should be used as the target list for the resulting SELECT + * statement. + * + * In the case that columns are specified in the attribute list, + * create a ColumnRef and ResTarget for each column and add them + * to the target list for the resulting SELECT statement. + */ + if (!stmt->attlist) + { + cr = makeNode(ColumnRef); + cr->fields = list_make1(makeNode(A_Star)); + cr->location = -1; + + target = makeNode(ResTarget); + target->name = NULL; + target->indirection = NIL; + target->val = (Node *) cr; + target->location = -1; + + targetList = list_make1(target); + } + else + { + ListCell *lc; + + foreach(lc, stmt->attlist) + { + /* + * Build the ColumnRef for each column. The ColumnRef + * 'fields' property is a String 'Value' node (see + * nodes/value.h) that corresponds to the column name + * respectively. + */ + cr = makeNode(ColumnRef); + cr->fields = list_make1(lfirst(lc)); + cr->location = -1; + + /* Build the ResTarget and add the ColumnRef to it. */ + target = makeNode(ResTarget); + target->name = NULL; + target->indirection = NIL; + target->val = (Node *) cr; + target->location = -1; + + /* Add each column to the SELECT statement's target list */ + targetList = lappend(targetList, target); + } + } + + /* + * Build RangeVar for from clause, fully qualified based on the + * relation which we have opened and locked. + */ + from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), + pstrdup(RelationGetRelationName(rel)), + -1); + + /* Build query */ + select = makeNode(SelectStmt); + select->targetList = targetList; + select->fromClause = list_make1(from); + + query = makeNode(RawStmt); + query->stmt = (Node *) select; + query->stmt_location = stmt_location; + query->stmt_len = stmt_len; + + /* + * Close the relation for now, but keep the lock on it to prevent + * changes between now and when we start the query-based COPY. + * + * We'll reopen it later as part of the query-based COPY. + */ + heap_close(rel, NoLock); + rel = NULL; + } } else { Assert(stmt->query); + query = makeNode(RawStmt); + query->stmt = stmt->query; + query->stmt_location = stmt_location; + query->stmt_len = stmt_len; + + relid = InvalidOid; rel = NULL; } if (is_from) { - /* check read-only transaction */ - if (XactReadOnly && rel->rd_backend != MyBackendId) + Assert(rel); + + /* check read-only transaction and parallel mode */ + if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); + PreventCommandIfParallelMode("COPY FROM"); - cstate = BeginCopyFrom(rel, stmt->filename, - stmt->attlist, stmt->options); - processed = CopyFrom(cstate); /* copy from file to database */ + cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, + NULL, stmt->attlist, stmt->options); + *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } else { - cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename, + cstate = BeginCopyTo(pstate, rel, query, relid, + stmt->filename, stmt->is_program, stmt->attlist, stmt->options); - processed = DoCopyTo(cstate); /* copy from database to file */ + *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); } @@ -813,8 +991,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString) */ if (rel != NULL) heap_close(rel, (is_from ? NoLock : AccessShareLock)); - - return processed; } /* @@ -835,7 +1011,8 @@ DoCopy(const CopyStmt *stmt, const char *queryString) * self-consistency of the options list. */ void -ProcessCopyOptions(CopyState cstate, +ProcessCopyOptions(ParseState *pstate, + CopyState cstate, bool is_from, List *options) { @@ -851,7 +1028,7 @@ ProcessCopyOptions(CopyState cstate, /* Extract options from the statement node tree */ foreach(option, options) { - DefElem *defel = (DefElem *) lfirst(option); + DefElem *defel = lfirst_node(DefElem, option); if (strcmp(defel->defname, "format") == 0) { @@ -860,7 +1037,8 @@ ProcessCopyOptions(CopyState cstate, if (format_specified) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); format_specified = true; if (strcmp(fmt, "text") == 0) /* default format */ ; @@ -871,22 +1049,34 @@ ProcessCopyOptions(CopyState cstate, else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt))); + errmsg("COPY format \"%s\" not recognized", fmt), + parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "oids") == 0) { if (cstate->oids) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->oids = defGetBoolean(defel); } + else if (strcmp(defel->defname, "freeze") == 0) + { + if (cstate->freeze) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->freeze = defGetBoolean(defel); + } else if (strcmp(defel->defname, "delimiter") == 0) { if (cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->delim = defGetString(defel); } else if (strcmp(defel->defname, "null") == 0) @@ -894,7 +1084,8 @@ ProcessCopyOptions(CopyState cstate, if (cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->null_print = defGetString(defel); } else if (strcmp(defel->defname, "header") == 0) @@ -902,7 +1093,8 @@ ProcessCopyOptions(CopyState cstate, if (cstate->header_line) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->header_line = defGetBoolean(defel); } else if (strcmp(defel->defname, "quote") == 0) @@ -910,7 +1102,8 @@ ProcessCopyOptions(CopyState cstate, if (cstate->quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->quote = defGetString(defel); } else if (strcmp(defel->defname, "escape") == 0) @@ -918,7 +1111,8 @@ ProcessCopyOptions(CopyState cstate, if (cstate->escape) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->escape = defGetString(defel); } else if (strcmp(defel->defname, "force_quote") == 0) @@ -926,49 +1120,93 @@ ProcessCopyOptions(CopyState cstate, if (cstate->force_quote || cstate->force_quote_all) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, A_Star)) cstate->force_quote_all = true; else if (defel->arg && IsA(defel->arg, List)) - cstate->force_quote = (List *) defel->arg; + cstate->force_quote = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", - defel->defname))); + defel->defname), + parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_not_null") == 0) { if (cstate->force_notnull) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + if (defel->arg && IsA(defel->arg, List)) + cstate->force_notnull = castNode(List, defel->arg); + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a list of column names", + defel->defname), + parser_errposition(pstate, defel->location))); + } + else if (strcmp(defel->defname, "force_null") == 0) + { + if (cstate->force_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); if (defel->arg && IsA(defel->arg, List)) - cstate->force_notnull = (List *) defel->arg; + cstate->force_null = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", - defel->defname))); + defel->defname), + parser_errposition(pstate, defel->location))); + } + else if (strcmp(defel->defname, "convert_selectively") == 0) + { + /* + * Undocumented, not-accessible-from-SQL option: convert only the + * named columns to binary form, storing the rest as NULLs. It's + * allowed for the column list to be NIL. + */ + if (cstate->convert_selectively) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->convert_selectively = true; + if (defel->arg == NULL || IsA(defel->arg, List)) + cstate->convert_select = castNode(List, defel->arg); + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a list of column names", + defel->defname), + parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "encoding") == 0) { if (cstate->file_encoding >= 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); cstate->file_encoding = pg_char_to_encoding(defGetString(defel)); if (cstate->file_encoding < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a valid encoding name", - defel->defname))); + defel->defname), + parser_errposition(pstate, defel->location))); } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("option \"%s\" not recognized", - defel->defname))); + defel->defname), + parser_errposition(pstate, defel->location))); } /* @@ -1090,6 +1328,17 @@ ProcessCopyOptions(CopyState cstate, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null only available using COPY FROM"))); + /* Check force_null */ + if (!cstate->csv_mode && cstate->force_null != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force null available only in CSV mode"))); + + if (cstate->force_null != NIL && !is_from) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY force null only available using COPY FROM"))); + /* Don't allow the delimiter to appear in the null string. */ if (strchr(cstate->null_print, cstate->delim[0]) != NULL) ereport(ERROR, @@ -1120,10 +1369,11 @@ ProcessCopyOptions(CopyState cstate, * NULL values as . */ static CopyState -BeginCopy(bool is_from, +BeginCopy(ParseState *pstate, + bool is_from, Relation rel, - Node *raw_query, - const char *queryString, + RawStmt *raw_query, + Oid queryRelId, List *attnamelist, List *options) { @@ -1141,14 +1391,12 @@ BeginCopy(bool is_from, */ cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, "COPY", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_DEFAULT_SIZES); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ - ProcessCopyOptions(cstate, is_from, options); + ProcessCopyOptions(pstate, cstate, is_from, options); /* Process the source/target relation or query */ if (rel) @@ -1165,6 +1413,30 @@ BeginCopy(bool is_from, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(cstate->rel)))); + + /* Initialize state for CopyFrom tuple routing. */ + if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDispatch *partition_dispatch_info; + ResultRelInfo *partitions; + TupleConversionMap **partition_tupconv_maps; + TupleTableSlot *partition_tuple_slot; + int num_parted, + num_partitions; + + ExecSetupPartitionTupleRouting(rel, + &partition_dispatch_info, + &partitions, + &partition_tupconv_maps, + &partition_tuple_slot, + &num_parted, &num_partitions); + cstate->partition_dispatch_info = partition_dispatch_info; + cstate->num_dispatch = num_parted; + cstate->partitions = partitions; + cstate->num_partitions = num_partitions; + cstate->partition_tupconv_maps = partition_tupconv_maps; + cstate->partition_tuple_slot = partition_tuple_slot; + } } else { @@ -1176,14 +1448,14 @@ BeginCopy(bool is_from, Assert(!is_from); cstate->rel = NULL; - /* Don't allow COPY w/ OIDs from a select */ + /* Don't allow COPY w/ OIDs from a query */ if (cstate->oids) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("COPY (SELECT) WITH OIDS is not supported"))); + errmsg("COPY (query) WITH OIDS is not supported"))); /* - * Run parse analysis and rewrite. Note this also acquires sufficient + * Run parse analysis and rewrite. Note this also acquires sufficient * locks on the source table(s). * * Because the parser and planner tend to scribble on their input, we @@ -1192,25 +1464,95 @@ BeginCopy(bool is_from, * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ - rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query), - queryString, NULL, 0); + rewritten = pg_analyze_and_rewrite(copyObject(raw_query), + pstate->p_sourcetext, NULL, 0, + NULL); - /* We don't expect more or less than one result query */ - if (list_length(rewritten) != 1) - elog(ERROR, "unexpected rewrite result"); + /* check that we got back something we can work with */ + if (rewritten == NIL) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DO INSTEAD NOTHING rules are not supported for COPY"))); + } + else if (list_length(rewritten) > 1) + { + ListCell *lc; - query = (Query *) linitial(rewritten); - Assert(query->commandType == CMD_SELECT); - Assert(query->utilityStmt == NULL); + /* examine queries to determine which error message to issue */ + foreach(lc, rewritten) + { + Query *q = lfirst_node(Query, lc); + + if (q->querySource == QSRC_QUAL_INSTEAD_RULE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("conditional DO INSTEAD rules are not supported for COPY"))); + if (q->querySource == QSRC_NON_INSTEAD_RULE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DO ALSO rules are not supported for the COPY"))); + } - /* Query mustn't use INTO, either */ - if (query->intoClause) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("multi-statement DO INSTEAD rules are not supported for COPY"))); + } + + query = linitial_node(Query, rewritten); + + /* The grammar allows SELECT INTO, but we don't support that */ + if (query->utilityStmt != NULL && + IsA(query->utilityStmt, CreateTableAsStmt)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY (SELECT INTO) is not supported"))); + Assert(query->utilityStmt == NULL); + + /* + * Similarly the grammar doesn't enforce the presence of a RETURNING + * clause, but this is required here. + */ + if (query->commandType != CMD_SELECT && + query->returningList == NIL) + { + Assert(query->commandType == CMD_INSERT || + query->commandType == CMD_UPDATE || + query->commandType == CMD_DELETE); + + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY query must have a RETURNING clause"))); + } + /* plan the query */ - plan = planner(query, 0, NULL); + plan = pg_plan_query(query, 0, NULL); + + /* + * With row level security and a user using "COPY relation TO", we + * have to convert the "COPY relation TO" to a query-based COPY (eg: + * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add + * in any RLS clauses. + * + * When this happens, we are passed in the relid of the originally + * found relation (which we have locked). As the planner will look up + * the relation again, we double-check here to make sure it found the + * same one that we have locked. + */ + if (queryRelId != InvalidOid) + { + /* + * Note that with RLS involved there may be multiple relations, + * and while the one we need is almost certainly first, we don't + * make any guarantees of that in the planner, so check the whole + * list and make sure we find the original relation. + */ + if (!list_member_oid(plan->relationOids, queryRelId)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("relation referenced by COPY statement has changed"))); + } /* * Use a snapshot with an updated command ID to ensure this query sees @@ -1224,10 +1566,10 @@ BeginCopy(bool is_from, ((DR_copy *) dest)->cstate = cstate; /* Create a QueryDesc requesting no output */ - cstate->queryDesc = CreateQueryDesc(plan, queryString, + cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext, GetActiveSnapshot(), InvalidSnapshot, - dest, NULL, 0); + dest, NULL, NULL, 0); /* * Call ExecutorStart to prepare the plan for execution. @@ -1244,7 +1586,7 @@ BeginCopy(bool is_from, num_phys_attrs = tupDesc->natts; - /* Convert FORCE QUOTE name list to per-column flags, check validity */ + /* Convert FORCE_QUOTE name list to per-column flags, check validity */ cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_quote_all) { @@ -1267,13 +1609,13 @@ BeginCopy(bool is_from, if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE QUOTE column \"%s\" not referenced by COPY", + errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", NameStr(tupDesc->attrs[attnum - 1]->attname)))); cstate->force_quote_flags[attnum - 1] = true; } } - /* Convert FORCE NOT NULL name list to per-column flags, check validity */ + /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_notnull) { @@ -1289,12 +1631,57 @@ BeginCopy(bool is_from, if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY", + errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", NameStr(tupDesc->attrs[attnum - 1]->attname)))); cstate->force_notnull_flags[attnum - 1] = true; } } + /* Convert FORCE_NULL name list to per-column flags, check validity */ + cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->force_null) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("FORCE_NULL column \"%s\" not referenced by COPY", + NameStr(tupDesc->attrs[attnum - 1]->attname)))); + cstate->force_null_flags[attnum - 1] = true; + } + } + + /* Convert convert_selectively name list to per-column flags */ + if (cstate->convert_selectively) + { + List *attnums; + ListCell *cur; + + cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg_internal("selected column \"%s\" not referenced by COPY", + NameStr(tupDesc->attrs[attnum - 1]->attname)))); + cstate->convert_select_flags[attnum - 1] = true; + } + } + /* Use client encoding when ENCODING option is not specified. */ if (cstate->file_encoding < 0) cstate->file_encoding = pg_get_client_encoding(); @@ -1318,16 +1705,46 @@ BeginCopy(bool is_from, } /* - * Release resources allocated in a cstate for COPY TO/FROM. + * Closes the pipe to an external program, checking the pclose() return code. */ static void -EndCopy(CopyState cstate) +ClosePipeToProgram(CopyState cstate) { - if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + int pclose_rc; + + Assert(cstate->is_program); + + pclose_rc = ClosePipeStream(cstate->copy_file); + if (pclose_rc == -1) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); + errmsg("could not close pipe to external command: %m"))); + else if (pclose_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("program \"%s\" failed", + cstate->filename), + errdetail_internal("%s", wait_result_to_str(pclose_rc)))); +} + +/* + * Release resources allocated in a cstate for COPY TO/FROM. + */ +static void +EndCopy(CopyState cstate) +{ + if (cstate->is_program) + { + ClosePipeToProgram(cstate); + } + else + { + if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->filename))); + } MemoryContextDelete(cstate->copycontext); pfree(cstate); @@ -1337,10 +1754,12 @@ EndCopy(CopyState cstate) * Setup CopyState to read tuples from a table or a query for COPY TO. */ static CopyState -BeginCopyTo(Relation rel, - Node *query, - const char *queryString, +BeginCopyTo(ParseState *pstate, + Relation rel, + RawStmt *query, + Oid queryRelId, const char *filename, + bool is_program, List *attnamelist, List *options) { @@ -1356,6 +1775,12 @@ BeginCopyTo(Relation rel, errmsg("cannot copy from view \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); + else if (rel->rd_rel->relkind == RELKIND_MATVIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from materialized view \"%s\"", + RelationGetRelationName(rel)), + errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -1367,6 +1792,12 @@ BeginCopyTo(Relation rel, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from sequence \"%s\"", RelationGetRelationName(rel)))); + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy from partitioned table \"%s\"", + RelationGetRelationName(rel)), + errhint("Try the COPY (SELECT ...) TO variant."))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -1374,44 +1805,72 @@ BeginCopyTo(Relation rel, RelationGetRelationName(rel)))); } - cstate = BeginCopy(false, rel, query, queryString, attnamelist, options); + cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist, + options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); if (pipe) { + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } else { - mode_t oumask; /* Pre-existing umask value */ - struct stat st; + cstate->filename = pstrdup(filename); + cstate->is_program = is_program; - /* - * Prevent write to relative path ... too easy to shoot oneself in the - * foot by overwriting a database file ... - */ - if (!is_absolute_path(filename)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("relative path not allowed for COPY to file"))); + if (is_program) + { + cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not execute command \"%s\": %m", + cstate->filename))); + } + else + { + mode_t oumask; /* Pre-existing umask value */ + struct stat st; - cstate->filename = pstrdup(filename); - oumask = umask(S_IWGRP | S_IWOTH); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); - umask(oumask); + /* + * Prevent write to relative path ... too easy to shoot oneself in + * the foot by overwriting a database file ... + */ + if (!is_absolute_path(filename)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("relative path not allowed for COPY to file"))); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - cstate->filename))); + oumask = umask(S_IWGRP | S_IWOTH); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); + umask(oumask); + if (cstate->copy_file == NULL) + { + /* copy errno because ereport subfunctions might change it */ + int save_errno = errno; - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->filename), + (save_errno == ENOENT || save_errno == EACCES) ? + errhint("COPY TO instructs the PostgreSQL server process to write a file. " + "You may want a client-side facility such as psql's \\copy.") : 0)); + } + + if (fstat(fileno(cstate->copy_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } } MemoryContextSwitchTo(oldcontext); @@ -1520,13 +1979,11 @@ CopyTo(CopyState cstate) * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside * datatype output routines, and should be faster than retail pfree's - * anyway. (We don't need a whole econtext as CopyFrom does.) + * anyway. (We don't need a whole econtext as CopyFrom does.) */ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, "COPY TO", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_DEFAULT_SIZES); if (cstate->binary) { @@ -1534,7 +1991,7 @@ CopyTo(CopyState cstate) int32 tmp; /* Signature */ - CopySendData(cstate, (char *) BinarySignature, 11); + CopySendData(cstate, BinarySignature, 11); /* Flags field */ tmp = 0; if (cstate->oids) @@ -1612,7 +2069,7 @@ CopyTo(CopyState cstate) else { /* run the plan --- the dest receiver will send tuples */ - ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } @@ -1764,8 +2221,18 @@ CopyFromErrorCallback(void *arg) } else { - /* error is relevant to a particular line */ - if (cstate->line_buf_converted || !cstate->need_transcoding) + /* + * Error is relevant to a particular line. + * + * If line_buf still contains the correct line, and it's already + * transcoded, print it. If it's still in a foreign encoding, it's + * quite likely that the error is precisely a failure to do + * encoding conversion (ie, bad data). We dare not try to convert + * it, and at present there's no way to regurgitate it without + * conversion. So we have to punt and just report the line number. + */ + if (cstate->line_buf_valid && + (cstate->line_buf_converted || !cstate->need_transcoding)) { char *lineval; @@ -1776,14 +2243,6 @@ CopyFromErrorCallback(void *arg) } else { - /* - * Here, the line buffer is still in a foreign encoding, and - * indeed it's quite likely that the error is precisely a - * failure to do encoding conversion (ie, bad data). We dare - * not try to convert it, and at present there's no way to - * regurgitate it without conversion. So we have to punt and - * just report the line number. - */ errcontext("COPY %s, line %d", cstate->cur_relname, cstate->cur_lineno); } @@ -1829,7 +2288,7 @@ limit_printout_length(const char *str) /* * Copy FROM file to relation. */ -static uint64 +uint64 CopyFrom(CopyState cstate) { HeapTuple tuple; @@ -1837,24 +2296,48 @@ CopyFrom(CopyState cstate) Datum *values; bool *nulls; ResultRelInfo *resultRelInfo; + ResultRelInfo *saved_resultRelInfo = NULL; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ExprContext *econtext; TupleTableSlot *myslot; MemoryContext oldcontext = CurrentMemoryContext; - ErrorContextCallback errcontext; + + ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; uint64 processed = 0; + bool useHeapMultiInsert; + int nBufferedTuples = 0; + int prev_leaf_part_index = -1; + +#define MAX_BUFFERED_TUPLES 1000 + HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ + Size bufferedTuplesSize = 0; + int firstBufferedLineNo = 0; Assert(cstate->rel); - if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) + /* + * The target must be a plain relation or have an INSTEAD OF INSERT row + * trigger. (Currently, such triggers are only allowed on views, so we + * only hint about them in the view case.) + */ + if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && + cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && + !(cstate->rel->trigdesc && + cstate->rel->trigdesc->trig_insert_instead_row)) { if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", + RelationGetRelationName(cstate->rel)), + errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); + else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to materialized view \"%s\"", RelationGetRelationName(cstate->rel)))); else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, @@ -1887,9 +2370,17 @@ CopyFrom(CopyState cstate) * routine first. * * As mentioned in comments in utils/rel.h, the in-same-transaction test - * is not completely reliable, since in rare cases rd_createSubid or - * rd_newRelfilenodeSubid can be cleared before the end of the transaction. - * However this is OK since at worst we will fail to make the optimization. + * is not always set correctly, since in rare cases rd_newRelfilenodeSubid + * can be cleared before the end of the transaction. The exact case is + * when a relation sets a new relfilenode twice in same transaction, yet + * the second one fails in an aborted subtransaction, e.g. + * + * BEGIN; + * TRUNCATE t; + * SAVEPOINT save; + * TRUNCATE t; + * ROLLBACK TO save; + * COPY ... * * Also, if the target file is new-in-transaction, we assume that checking * FSM for free space is a waste of time, even if we must use WAL because @@ -1902,6 +2393,7 @@ CopyFrom(CopyState cstate) * no additional work to enforce that. *---------- */ + /* createSubid is creation check, newRelfilenodeSubid is truncation check */ if (cstate->rel->rd_createSubid != InvalidSubTransactionId || cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId) { @@ -1910,29 +2402,48 @@ CopyFrom(CopyState cstate) hi_options |= HEAP_INSERT_SKIP_WAL; } + /* + * Optimize if new relfilenode was created in this subxact or one of its + * committed children and we won't see those rows later as part of an + * earlier scan or command. This ensures that if this subtransaction + * aborts then the frozen rows won't be visible after xact cleanup. Note + * that the stronger test of exactly which subtransaction created it is + * crucial for correctness of this optimization. + */ + if (cstate->freeze) + { + if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot perform FREEZE because of prior transaction activity"))); + + if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && + cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction"))); + + hi_options |= HEAP_INSERT_FROZEN; + } + /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code * here that basically duplicated execUtils.c ...) */ resultRelInfo = makeNode(ResultRelInfo); - resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ - resultRelInfo->ri_RelationDesc = cstate->rel; - resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc); - if (resultRelInfo->ri_TrigDesc) - { - resultRelInfo->ri_TrigFunctions = (FmgrInfo *) - palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo)); - resultRelInfo->ri_TrigWhenExprs = (List **) - palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(List *)); - } - resultRelInfo->ri_TrigInstrument = NULL; + InitResultRelInfo(resultRelInfo, + cstate->rel, + 1, /* dummy rangetable index */ + NULL, + 0); - ExecOpenIndices(resultRelInfo); + ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; estate->es_result_relation_info = resultRelInfo; + estate->es_range_table = cstate->range_table; /* Set up a tuple slot too */ myslot = ExecInitExtraTupleSlot(estate); @@ -1940,11 +2451,35 @@ CopyFrom(CopyState cstate) /* Triggers might need a slot as well */ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* + * It's more efficient to prepare a bunch of tuples for insertion, and + * insert them in one heap_multi_insert() call, than call heap_insert() + * separately for every tuple. However, we can't do that if there are + * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default + * expressions. Such triggers or expressions might query the table we're + * inserting to, and act differently if the tuples that have already been + * processed and prepared for insertion are not there. We also can't do + * it if the table is partitioned. + */ + if ((resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_before_row || + resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) || + cstate->partition_dispatch_info != NULL || + cstate->volatile_defexprs) + { + useHeapMultiInsert = false; + } + else + { + useHeapMultiInsert = true; + bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple)); + } + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); /* - * Check BEFORE STATEMENT insertion triggers. It's debateable whether we + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we * should do this for COPY, since it's not really an "INSERT" statement as * such. However, executing these triggers maintains consistency with the * EACH ROW triggers that we already fire on COPY. @@ -1958,10 +2493,10 @@ CopyFrom(CopyState cstate) econtext = GetPerTupleExprContext(estate); /* Set up callback to identify error line number */ - errcontext.callback = CopyFromErrorCallback; - errcontext.arg = (void *) cstate; - errcontext.previous = error_context_stack; - error_context_stack = &errcontext; + errcallback.callback = CopyFromErrorCallback; + errcallback.arg = (void *) cstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; for (;;) { @@ -1971,8 +2506,15 @@ CopyFrom(CopyState cstate) CHECK_FOR_INTERRUPTS(); - /* Reset the per-tuple exprcontext */ - ResetPerTupleExprContext(estate); + if (nBufferedTuples == 0) + { + /* + * Reset the per-tuple exprcontext. We can only do this if the + * tuple buffer is empty. (Calling the context the per-tuple + * memory context is a bit of a misnomer now.) + */ + ResetPerTupleExprContext(estate); + } /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); @@ -1986,6 +2528,12 @@ CopyFrom(CopyState cstate) if (loaded_oid != InvalidOid) HeapTupleSetOid(tuple, loaded_oid); + /* + * Constraints might reference the tableoid column, so initialize + * t_tableOid before evaluating them. + */ + tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + /* Triggers and stuff need to be invoked in query context. */ MemoryContextSwitchTo(oldcontext); @@ -1993,6 +2541,81 @@ CopyFrom(CopyState cstate) slot = myslot; ExecStoreTuple(tuple, slot, InvalidBuffer, false); + /* Determine the partition to heap_insert the tuple into */ + if (cstate->partition_dispatch_info) + { + int leaf_part_index; + TupleConversionMap *map; + + /* + * Away we go ... If we end up not finding a partition after all, + * ExecFindPartition() does not return and errors out instead. + * Otherwise, the returned value is to be used as an index into + * arrays mt_partitions[] and mt_partition_tupconv_maps[] that + * will get us the ResultRelInfo and TupleConversionMap for the + * partition, respectively. + */ + leaf_part_index = ExecFindPartition(resultRelInfo, + cstate->partition_dispatch_info, + slot, + estate); + Assert(leaf_part_index >= 0 && + leaf_part_index < cstate->num_partitions); + + /* + * If this tuple is mapped to a partition that is not same as the + * previous one, we'd better make the bulk insert mechanism gets a + * new buffer. + */ + if (prev_leaf_part_index != leaf_part_index) + { + ReleaseBulkInsertStatePin(bistate); + prev_leaf_part_index = leaf_part_index; + } + + /* + * Save the old ResultRelInfo and switch to the one corresponding + * to the selected partition. + */ + saved_resultRelInfo = resultRelInfo; + resultRelInfo = cstate->partitions + leaf_part_index; + + /* We do not yet have a way to insert into a foreign partition */ + if (resultRelInfo->ri_FdwRoutine) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot route inserted tuples to a foreign table"))); + + /* + * For ExecInsertIndexTuples() to work on the partition's indexes + */ + estate->es_result_relation_info = resultRelInfo; + + /* + * We might need to convert from the parent rowtype to the + * partition rowtype. + */ + map = cstate->partition_tupconv_maps[leaf_part_index]; + if (map) + { + Relation partrel = resultRelInfo->ri_RelationDesc; + + tuple = do_convert_tuple(tuple, map); + + /* + * We must use the partition's tuple descriptor from this + * point on. Use a dedicated slot from this point on until + * we're finished dealing with the partition. + */ + slot = cstate->partition_tuple_slot; + Assert(slot != NULL); + ExecSetSlotDescriptor(slot, RelationGetDescr(partrel)); + ExecStoreTuple(tuple, slot, InvalidBuffer, true); + } + + tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + } + skip_tuple = false; /* BEFORE ROW INSERT Triggers */ @@ -2009,24 +2632,67 @@ CopyFrom(CopyState cstate) if (!skip_tuple) { - List *recheckIndexes = NIL; + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_instead_row) + { + /* Pass the data to the INSTEAD ROW INSERT trigger */ + ExecIRInsertTriggers(estate, resultRelInfo, slot); + } + else + { + /* Check the constraints of the tuple */ + if (cstate->rel->rd_att->constr || + resultRelInfo->ri_PartitionCheck) + ExecConstraints(resultRelInfo, slot, estate); - /* Check the constraints of the tuple */ - if (cstate->rel->rd_att->constr) - ExecConstraints(resultRelInfo, slot, estate); + if (useHeapMultiInsert) + { + /* Add this tuple to the tuple buffer */ + if (nBufferedTuples == 0) + firstBufferedLineNo = cstate->cur_lineno; + bufferedTuples[nBufferedTuples++] = tuple; + bufferedTuplesSize += tuple->t_len; - /* OK, store the tuple and create index entries for it */ - heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); + /* + * If the buffer filled up, flush it. Also flush if the + * total size of all the tuples in the buffer becomes + * large, to avoid using large amounts of memory for the + * buffer when the tuples are exceptionally wide. + */ + if (nBufferedTuples == MAX_BUFFERED_TUPLES || + bufferedTuplesSize > 65535) + { + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + resultRelInfo, myslot, bistate, + nBufferedTuples, bufferedTuples, + firstBufferedLineNo); + nBufferedTuples = 0; + bufferedTuplesSize = 0; + } + } + else + { + List *recheckIndexes = NIL; - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), - estate); + /* OK, store the tuple and create index entries for it */ + heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid, + hi_options, bistate); - /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, tuple, - recheckIndexes); + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(slot, + &(tuple->t_self), + estate, + false, + NULL, + NIL); - list_free(recheckIndexes); + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, tuple, + recheckIndexes); + + list_free(recheckIndexes); + } + } /* * We count only tuples not suppressed by a BEFORE INSERT trigger; @@ -2034,16 +2700,36 @@ CopyFrom(CopyState cstate) * tuples inserted by an INSERT command. */ processed++; + + if (saved_resultRelInfo) + { + resultRelInfo = saved_resultRelInfo; + estate->es_result_relation_info = resultRelInfo; + } } } + /* Flush any remaining buffered tuples */ + if (nBufferedTuples > 0) + CopyFromInsertBatch(cstate, estate, mycid, hi_options, + resultRelInfo, myslot, bistate, + nBufferedTuples, bufferedTuples, + firstBufferedLineNo); + /* Done, clean up */ - error_context_stack = errcontext.previous; + error_context_stack = errcallback.previous; FreeBulkInsertState(bistate); MemoryContextSwitchTo(oldcontext); + /* + * In the old protocol, tell pqcomm that we can process normal protocol + * messages again. + */ + if (cstate->copy_dest == COPY_OLD_FE) + pq_endmsgread(); + /* Execute AFTER STATEMENT insertion triggers */ ExecASInsertTriggers(estate, resultRelInfo); @@ -2057,6 +2743,39 @@ CopyFrom(CopyState cstate) ExecCloseIndices(resultRelInfo); + /* Close all the partitioned tables, leaf partitions, and their indices */ + if (cstate->partition_dispatch_info) + { + int i; + + /* + * Remember cstate->partition_dispatch_info[0] corresponds to the root + * partitioned table, which we must not try to close, because it is + * the main target table of COPY that will be closed eventually by + * DoCopy(). Also, tupslot is NULL for the root partitioned table. + */ + for (i = 1; i < cstate->num_dispatch; i++) + { + PartitionDispatch pd = cstate->partition_dispatch_info[i]; + + heap_close(pd->reldesc, NoLock); + ExecDropSingleTupleTableSlot(pd->tupslot); + } + for (i = 0; i < cstate->num_partitions; i++) + { + ResultRelInfo *resultRelInfo = cstate->partitions + i; + + ExecCloseIndices(resultRelInfo); + heap_close(resultRelInfo->ri_RelationDesc, NoLock); + } + + /* Release the standalone partition tuple descriptor */ + ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot); + } + + /* Close any trigger target relations */ + ExecCleanUpTriggerState(estate); + FreeExecutorState(estate); /* @@ -2069,6 +2788,84 @@ CopyFrom(CopyState cstate) return processed; } +/* + * A subroutine of CopyFrom, to write the current batch of buffered heap + * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT + * triggers. + */ +static void +CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, + int hi_options, ResultRelInfo *resultRelInfo, + TupleTableSlot *myslot, BulkInsertState bistate, + int nBufferedTuples, HeapTuple *bufferedTuples, + int firstBufferedLineNo) +{ + MemoryContext oldcontext; + int i; + int save_cur_lineno; + + /* + * Print error context information correctly, if one of the operations + * below fail. + */ + cstate->line_buf_valid = false; + save_cur_lineno = cstate->cur_lineno; + + /* + * heap_multi_insert leaks memory, so switch to short-lived memory context + * before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + heap_multi_insert(cstate->rel, + bufferedTuples, + nBufferedTuples, + mycid, + hi_options, + bistate); + MemoryContextSwitchTo(oldcontext); + + /* + * If there are any indexes, update them for all the inserted tuples, and + * run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + for (i = 0; i < nBufferedTuples; i++) + { + List *recheckIndexes; + + cstate->cur_lineno = firstBufferedLineNo + i; + ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false); + recheckIndexes = + ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self), + estate, false, NULL, NIL); + ExecARInsertTriggers(estate, resultRelInfo, + bufferedTuples[i], + recheckIndexes); + list_free(recheckIndexes); + } + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers + * anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + resultRelInfo->ri_TrigDesc->trig_insert_after_row) + { + for (i = 0; i < nBufferedTuples; i++) + { + cstate->cur_lineno = firstBufferedLineNo + i; + ExecARInsertTriggers(estate, resultRelInfo, + bufferedTuples[i], + NIL); + } + } + + /* reset cur_lineno to where we were */ + cstate->cur_lineno = save_cur_lineno; +} + /* * Setup to read tuples from a file for COPY FROM. * @@ -2080,8 +2877,11 @@ CopyFrom(CopyState cstate) * Returns a CopyState, to be passed to NextCopyFrom and related functions. */ CopyState -BeginCopyFrom(Relation rel, +BeginCopyFrom(ParseState *pstate, + Relation rel, const char *filename, + bool is_program, + copy_data_source_cb data_source_cb, List *attnamelist, List *options) { @@ -2098,8 +2898,9 @@ BeginCopyFrom(Relation rel, int *defmap; ExprState **defexprs; MemoryContext oldcontext; + bool volatile_defexprs; - cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options); + cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Initialize state variables */ @@ -2117,10 +2918,15 @@ BeginCopyFrom(Relation rel, cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; + /* Assign range table, we'll need it in CopyFrom. */ + if (pstate) + cstate->range_table = pstate->p_rtable; + tupDesc = RelationGetDescr(cstate->rel); attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; num_defaults = 0; + volatile_defexprs = false; /* * Pick up the required catalog information for each attribute in the @@ -2153,15 +2959,34 @@ BeginCopyFrom(Relation rel, { /* attribute is NOT to be copied from input */ /* use default value if one exists */ - Node *defexpr = build_column_default(cstate->rel, attnum); + Expr *defexpr = (Expr *) build_column_default(cstate->rel, + attnum); if (defexpr != NULL) { - /* Initialize expressions in copycontext. */ - defexprs[num_defaults] = ExecInitExpr( - expression_planner((Expr *) defexpr), NULL); + /* Run the expression through planner */ + defexpr = expression_planner(defexpr); + + /* Initialize executable expression in copycontext */ + defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); defmap[num_defaults] = attnum - 1; num_defaults++; + + /* + * If a default expression looks at the table being loaded, + * then it could give the wrong answer when using + * multi-insert. Since database access can be dynamic this is + * hard to test for exactly, so we use the much wider test of + * whether the default expression is volatile. We allow for + * the special case of when the default expression is the + * nextval() of a sequence which in this specific case is + * known to be safe for use with the multi-insert + * optimization. Hence we use this special case function + * checker rather than the standard check for + * contain_volatile_functions(). + */ + if (!volatile_defexprs) + volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); } } } @@ -2171,10 +2996,18 @@ BeginCopyFrom(Relation rel, cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; + cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; + cstate->is_program = is_program; - if (pipe) + if (data_source_cb) { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_source_cb = data_source_cb; + } + else if (pipe) + { + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); else @@ -2182,22 +3015,47 @@ BeginCopyFrom(Relation rel, } else { - struct stat st; - cstate->filename = pstrdup(filename); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); + if (cstate->is_program) + { + cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not execute command \"%s\": %m", + cstate->filename))); + } + else + { + struct stat st; - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + if (cstate->copy_file == NULL) + { + /* copy errno because ereport subfunctions might change it */ + int save_errno = errno; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename), + (save_errno == ENOENT || save_errno == EACCES) ? + errhint("COPY FROM instructs the PostgreSQL server process to read a file. " + "You may want a client-side facility such as psql's \\copy.") : 0)); + } + + if (fstat(fileno(cstate->copy_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + } } if (!cstate->binary) @@ -2418,11 +3276,35 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, NameStr(attr[m]->attname)))); string = field_strings[fieldno++]; - if (cstate->csv_mode && string == NULL && - cstate->force_notnull_flags[m]) + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (cstate->csv_mode) { - /* Go ahead and read the NULL string */ - string = cstate->null_print; + if (string == NULL && + cstate->force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - + * convert it to the NULL string. + */ + string = cstate->null_print; + } + else if (string != NULL && cstate->force_null_flags[m] + && strcmp(string, cstate->null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the + * string would already have been set to NULL. Convert it + * to NULL as specified. + */ + string = NULL; + } } cstate->cur_attname = NameStr(attr[m]->attname); @@ -2462,7 +3344,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, * if client chooses to send that now. * * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We + * copy, since there is no protocol-level EOF marker then. We * could go either way for copy from file, but choose to throw * error if there's data after the EOF marker, for consistency * with the new-protocol case. @@ -2524,7 +3406,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* * Now compute and insert any defaults available for the columns not - * provided by the input data. Anything not processed here or above will + * provided by the input data. Anything not processed here or above will * remain NULL. */ for (i = 0; i < num_defaults; i++) @@ -2537,7 +3419,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, - &nulls[defmap[i]], NULL); + &nulls[defmap[i]]); } return true; @@ -2559,7 +3441,7 @@ EndCopyFrom(CopyState cstate) * server encoding. * * Result is true if read was terminated by EOF, false if terminated - * by newline. The terminating newline or EOF marker is not included + * by newline. The terminating newline or EOF marker is not included * in the final value of line_buf. */ static bool @@ -2568,6 +3450,7 @@ CopyReadLine(CopyState cstate) bool result; resetStringInfo(&cstate->line_buf); + cstate->line_buf_valid = true; /* Mark that encoding conversion hasn't occurred yet */ cstate->line_buf_converted = false; @@ -2714,7 +3597,7 @@ CopyReadLineText(CopyState cstate) * of read-ahead and avoid the many calls to * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol * does not allow us to read too far ahead or we might read into the - * next data, so we read-ahead only as far we know we can. One + * next data, so we read-ahead only as far we know we can. One * optimization would be to read-ahead four byte here if * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it, * considering the size of the buffer. @@ -2724,7 +3607,7 @@ CopyReadLineText(CopyState cstate) REFILL_LINEBUF; /* - * Try to read some more data. This will certainly reset + * Try to read some more data. This will certainly reset * raw_buf_index to zero, and raw_buf_ptr must go with it. */ if (!CopyLoadRawBuf(cstate)) @@ -2770,7 +3653,7 @@ CopyReadLineText(CopyState cstate) * just use the char as a toggle. If they are different, we need * to ensure that we only take account of an escape inside a * quoted field and immediately preceding a quote char, and not - * the second in a escape-escape sequence. + * the second in an escape-escape sequence. */ if (in_quote && c == escapec) last_was_esc = !last_was_esc; @@ -2782,7 +3665,7 @@ CopyReadLineText(CopyState cstate) /* * Updating the line count for embedded CR and/or LF chars is * necessarily a little fragile - this test is probably about the - * best we can do. (XXX it's arguable whether we should do this + * best we can do. (XXX it's arguable whether we should do this * at all --- is cur_lineno a physical or logical count?) */ if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) @@ -2961,7 +3844,7 @@ CopyReadLineText(CopyState cstate) * after a backslash is special, so we skip over that second * character too. If we didn't do that \\. would be * considered an eof-of copy, while in non-CSV mode it is a - * literal backslash followed by a period. In CSV mode, + * literal backslash followed by a period. In CSV mode, * backslashes are not special, so we want to process the * character after the backslash just like a normal character, * so we don't increment in those cases. @@ -3064,7 +3947,7 @@ CopyReadAttributesText(CopyState cstate) /* * The de-escaped attributes will certainly not be longer than the input * data line, so we can just force attribute_buf to be large enough and - * then transfer data without any checks for enough space. We need to do + * then transfer data without any checks for enough space. We need to do * it this way because enlarging attribute_buf mid-stream would invalidate * pointers already stored into cstate->raw_fields[]. */ @@ -3098,7 +3981,17 @@ CopyReadAttributesText(CopyState cstate) start_ptr = cur_ptr; cstate->raw_fields[fieldno] = output_ptr; - /* Scan data for field */ + /* + * Scan data for field. + * + * Note that in this loop, we are scanning to locate the end of field + * and also speculatively performing de-escaping. Once we find the + * end-of-field, we can match the raw field contents against the null + * marker string. Only after that comparison fails do we know that + * de-escaping is actually the right thing to do; therefore we *must + * not* throw any syntax errors before we've done the null-marker + * check. + */ for (;;) { char c; @@ -3211,26 +4104,29 @@ CopyReadAttributesText(CopyState cstate) *output_ptr++ = c; } - /* Terminate attribute value in output area */ - *output_ptr++ = '\0'; - - /* - * If we de-escaped a non-7-bit-ASCII char, make sure we still have - * valid data for the db encoding. Avoid calling strlen here for the - * sake of efficiency. - */ - if (saw_non_ascii) - { - char *fld = cstate->raw_fields[fieldno]; - - pg_verifymbstr(fld, output_ptr - (fld + 1), false); - } - /* Check whether raw input matched null marker */ input_len = end_ptr - start_ptr; if (input_len == cstate->null_print_len && strncmp(start_ptr, cstate->null_print, input_len) == 0) cstate->raw_fields[fieldno] = NULL; + else + { + /* + * At this point we know the field is supposed to contain data. + * + * If we de-escaped any non-7-bit-ASCII chars, make sure the + * resulting string is valid data for the db encoding. + */ + if (saw_non_ascii) + { + char *fld = cstate->raw_fields[fieldno]; + + pg_verifymbstr(fld, output_ptr - fld, false); + } + } + + /* Terminate attribute value in output area */ + *output_ptr++ = '\0'; fieldno++; /* Done if we hit EOL instead of a delim */ @@ -3281,7 +4177,7 @@ CopyReadAttributesCSV(CopyState cstate) /* * The de-escaped attributes will certainly not be longer than the input * data line, so we can just force attribute_buf to be large enough and - * then transfer data without any checks for enough space. We need to do + * then transfer data without any checks for enough space. We need to do * it this way because enlarging attribute_buf mid-stream would invalidate * pointers already stored into cstate->raw_fields[]. */ @@ -3496,7 +4392,7 @@ CopyAttributeOutText(CopyState cstate, char *string) /* * We have to grovel through the string searching for control characters * and instances of the delimiter character. In most cases, though, these - * are infrequent. To avoid overhead from calling CopySendData once per + * are infrequent. To avoid overhead from calling CopySendData once per * character, we dump out all characters between escaped characters in a * single call. The loop invariant is that the data from "start" to "ptr" * can be sent literally, but hasn't yet been. @@ -3805,7 +4701,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * copy_dest_receive --- receive one tuple */ -static void +static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) { DR_copy *myState = (DR_copy *) self; @@ -3817,6 +4713,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* And send the data */ CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); myState->processed++; + + return true; } /*