* copy.c
* Implements the COPY utility command
*
- * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
#include <arpa/inet.h>
#include "access/heapam.h"
+#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/xact.h"
-#include "catalog/namespace.h"
+#include "access/xlog.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "optimizer/clauses.h"
#include "optimizer/planner.h"
+#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
-#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/portal.h"
+#include "utils/rel.h"
+#include "utils/rls.h"
#include "utils/snapmgr.h"
*/
typedef enum CopyDest
{
- COPY_FILE, /* to/from file */
+ COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
- COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
+ COPY_CALLBACK /* to/from callback function */
} CopyDest;
/*
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
+ bool is_program; /* is 'filename' a program to popen? */
+ copy_data_source_cb data_source_cb; /* function for reading data */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
+ bool freeze; /* freeze rows on loading? */
bool csv_mode; /* Comma Separated Value format? */
bool header_line; /* CSV header line? */
char *null_print; /* NULL marker string (server encoding!) */
char *quote; /* CSV quote char (must be 1 byte) */
char *escape; /* CSV escape char (must be 1 byte) */
List *force_quote; /* list of column names */
- bool force_quote_all; /* FORCE QUOTE *? */
+ bool force_quote_all; /* FORCE_QUOTE *? */
bool *force_quote_flags; /* per-column CSV FQ flags */
List *force_notnull; /* list of column names */
bool *force_notnull_flags; /* per-column CSV FNN flags */
+ List *force_null; /* list of column names */
+ bool *force_null_flags; /* per-column CSV FN flags */
+ bool convert_selectively; /* do selective binary conversion? */
+ List *convert_select; /* list of column names (can be NIL) */
+ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
Oid *typioparams; /* array of element types for in_functions */
int *defmap; /* array of default att numbers */
ExprState **defexprs; /* array of default att expressions */
+ bool volatile_defexprs; /* is any of defexprs volatile? */
+ List *range_table;
+
+ PartitionDispatch *partition_dispatch_info;
+ int num_dispatch; /* Number of entries in the above array */
+ int num_partitions; /* Number of members in the following arrays */
+ ResultRelInfo *partitions; /* Per partition result relation */
+ TupleConversionMap **partition_tupconv_maps;
+ TupleTableSlot *partition_tuple_slot;
/*
* These variables are used to reduce overhead in textual COPY FROM.
*/
StringInfoData line_buf;
bool line_buf_converted; /* converted to server encoding? */
+ bool line_buf_valid; /* contains the row being processed? */
/*
* Finally, raw_buf holds raw data read from the data source (file or
- * client connection). CopyReadLine parses this data sufficiently to
+ * client connection). CopyReadLine parses this data sufficiently to
* locate line boundaries, then transfers the data to line_buf and
* converts it. Note: we guarantee that there is a \0 at
* raw_buf[raw_buf_len].
int raw_buf_len; /* total # of bytes stored */
} CopyStateData;
-/* DestReceiver for COPY (SELECT) TO */
+/* DestReceiver for COPY (query) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
* function call overhead in tight COPY loops.
*
* We must use "if (1)" because the usual "do {...} while(0)" wrapper would
- * prevent the continue/break processing from working. We end the "if (1)"
+ * prevent the continue/break processing from working. We end the "if (1)"
* with "else ((void) 0)" to ensure the "if" does not unintentionally match
* any "else" in the calling code, and to avoid any compiler warnings about
* empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
/* non-export function prototypes */
-static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
- const char *queryString, List *attnamelist, List *options);
+static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
+ RawStmt *raw_query, Oid queryRelId, List *attnamelist,
+ List *options);
static void EndCopy(CopyState cstate);
-static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
- const char *filename, List *attnamelist, List *options);
+static void ClosePipeToProgram(CopyState cstate);
+static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+ Oid queryRelId, const char *filename, bool is_program,
+ List *attnamelist, List *options);
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
+static void CopyFromInsertBatch(CopyState cstate, EState *estate,
+ CommandId mycid, int hi_options,
+ ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+ BulkInsertState bistate,
+ int nBufferedTuples, HeapTuple *bufferedTuples,
+ int firstBufferedLineNo);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
static void SendCopyBegin(CopyState cstate);
static void ReceiveCopyBegin(CopyState cstate);
static void SendCopyEnd(CopyState cstate);
-static void CopySendData(CopyState cstate, void *databuf, int datasize);
+static void CopySendData(CopyState cstate, const void *databuf, int datasize);
static void CopySendString(CopyState cstate, const char *str);
static void CopySendChar(CopyState cstate, char c);
static void CopySendEndOfRow(CopyState cstate);
pq_endmessage(&buf);
cstate->copy_dest = COPY_NEW_FE;
}
- else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+ else
{
/* old way */
if (cstate->binary)
pq_startcopyout();
cstate->copy_dest = COPY_OLD_FE;
}
- else
- {
- /* very old way */
- if (cstate->binary)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("COPY BINARY is not supported to stdout or from stdin")));
- pq_putemptymessage('B');
- /* grottiness needed for old COPY OUT protocol */
- pq_startcopyout();
- cstate->copy_dest = COPY_OLD_FE;
- }
}
static void
cstate->copy_dest = COPY_NEW_FE;
cstate->fe_msgbuf = makeStringInfo();
}
- else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+ else
{
/* old way */
if (cstate->binary)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY BINARY is not supported to stdout or from stdin")));
pq_putemptymessage('G');
- cstate->copy_dest = COPY_OLD_FE;
- }
- else
- {
- /* very old way */
- if (cstate->binary)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("COPY BINARY is not supported to stdout or from stdin")));
- pq_putemptymessage('D');
+ /* any error in old protocol will make us lose sync */
+ pq_startmsgread();
cstate->copy_dest = COPY_OLD_FE;
}
/* We *must* flush here to ensure FE knows it can send. */
*----------
*/
static void
-CopySendData(CopyState cstate, void *databuf, int datasize)
+CopySendData(CopyState cstate, const void *databuf, int datasize)
{
- appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
+ appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
}
static void
#endif
}
- (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
- 1, cstate->copy_file);
- if (ferror(cstate->copy_file))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write to COPY file: %m")));
+ if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
+ cstate->copy_file) != 1 ||
+ ferror(cstate->copy_file))
+ {
+ if (cstate->is_program)
+ {
+ if (errno == EPIPE)
+ {
+ /*
+ * The pipe will be closed automatically on error at
+ * the end of transaction, but we might get a better
+ * error message from the subprocess' exit code than
+ * just "Broken Pipe"
+ */
+ ClosePipeToProgram(cstate);
+
+ /*
+ * If ClosePipeToProgram() didn't throw an error, the
+ * program terminated normally, but closed the pipe
+ * first. Restore errno, and throw an error.
+ */
+ errno = EPIPE;
+ }
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to COPY program: %m")));
+ }
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to COPY file: %m")));
+ }
break;
case COPY_OLD_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ Assert(false); /* Not yet supported. */
+ break;
}
resetStringInfo(fe_msgbuf);
* CopyGetData reads data from the source (file or frontend)
*
* We attempt to read at least minread, and at most maxread, bytes from
- * the source. The actual number of bytes read is returned; if this is
+ * the source. The actual number of bytes read is returned; if this is
* less than minread, EOF was detected.
*
* Note: when copying from the frontend, we expect a proper EOF mark per
/* Only a \. terminator is legal EOF in old protocol */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on client connection")));
+ errmsg("unexpected EOF on client connection with an open transaction")));
}
bytesread = minread;
break;
int mtype;
readmessage:
+ HOLD_CANCEL_INTERRUPTS();
+ pq_startmsgread();
mtype = pq_getbyte();
if (mtype == EOF)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on client connection")));
+ errmsg("unexpected EOF on client connection with an open transaction")));
if (pq_getmessage(cstate->fe_msgbuf, 0))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on client connection")));
+ errmsg("unexpected EOF on client connection with an open transaction")));
+ RESUME_CANCEL_INTERRUPTS();
switch (mtype)
{
case 'd': /* CopyData */
bytesread += avail;
}
break;
+ case COPY_CALLBACK:
+ bytesread = cstate->data_source_cb(databuf, minread, maxread);
+ break;
}
return bytesread;
*
* Either unload or reload contents of table <relation>, depending on <from>.
* (<from> = TRUE means we are inserting into the table.) In the "TO" case
- * we also support copying the output of an arbitrary SELECT query.
+ * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
+ * or DELETE query.
*
* If <pipe> is false, transfer is between the table and the file named
- * <filename>. Otherwise, transfer is between the table and our regular
+ * <filename>. Otherwise, transfer is between the table and our regular
* input/output stream. The latter could be either stdin/stdout or a
* socket, depending on whether we're running under Postmaster control.
*
* Do not allow the copy if user doesn't have proper permission to access
* the table or the specifically requested columns.
*/
-uint64
-DoCopy(const CopyStmt *stmt, const char *queryString)
+void
+DoCopy(ParseState *pstate, const CopyStmt *stmt,
+ int stmt_location, int stmt_len,
+ uint64 *processed)
{
CopyState cstate;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
Relation rel;
- uint64 processed;
+ Oid relid;
+ RawStmt *query = NULL;
- /* Disallow file COPY except to superusers. */
+ /* Disallow COPY to/from file or program except to superusers. */
if (!pipe && !superuser())
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("must be superuser to COPY to or from a file"),
- errhint("Anyone can COPY to stdout or from stdin. "
- "psql's \\copy command also works for anyone.")));
+ {
+ if (stmt->is_program)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to COPY to or from an external program"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to COPY to or from a file"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+ }
if (stmt->relation)
{
TupleDesc tupDesc;
- AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
- RangeTblEntry *rte;
List *attnums;
ListCell *cur;
+ RangeTblEntry *rte;
Assert(!stmt->query);
rel = heap_openrv(stmt->relation,
(is_from ? RowExclusiveLock : AccessShareLock));
- rte = makeNode(RangeTblEntry);
- rte->rtekind = RTE_RELATION;
- rte->relid = RelationGetRelid(rel);
- rte->relkind = rel->rd_rel->relkind;
- rte->requiredPerms = required_access;
+ relid = RelationGetRelid(rel);
+
+ rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
+ rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
FirstLowInvalidHeapAttributeNumber;
if (is_from)
- rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
+ rte->insertedCols = bms_add_member(rte->insertedCols, attno);
else
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
- ExecCheckRTPerms(list_make1(rte), true);
+ ExecCheckRTPerms(pstate->p_rtable, true);
+
+ /*
+ * Permission check for row security policies.
+ *
+ * check_enable_rls will ereport(ERROR) if the user has requested
+ * something invalid and will otherwise indicate if we should enable
+ * RLS (returns RLS_ENABLED) or not for this COPY statement.
+ *
+ * If the relation has a row security policy and we are to apply it
+ * then perform a "query" copy and allow the normal query processing
+ * to handle the policies.
+ *
+ * If RLS is not enabled for this, then just fall through to the
+ * normal non-filtering relation handling.
+ */
+ if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
+ {
+ SelectStmt *select;
+ ColumnRef *cr;
+ ResTarget *target;
+ RangeVar *from;
+ List *targetList = NIL;
+
+ if (is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY FROM not supported with row-level security"),
+ errhint("Use INSERT statements instead.")));
+
+ /*
+ * Build target list
+ *
+ * If no columns are specified in the attribute list of the COPY
+ * command, then the target list is 'all' columns. Therefore, '*'
+ * should be used as the target list for the resulting SELECT
+ * statement.
+ *
+ * In the case that columns are specified in the attribute list,
+ * create a ColumnRef and ResTarget for each column and add them
+ * to the target list for the resulting SELECT statement.
+ */
+ if (!stmt->attlist)
+ {
+ cr = makeNode(ColumnRef);
+ cr->fields = list_make1(makeNode(A_Star));
+ cr->location = -1;
+
+ target = makeNode(ResTarget);
+ target->name = NULL;
+ target->indirection = NIL;
+ target->val = (Node *) cr;
+ target->location = -1;
+
+ targetList = list_make1(target);
+ }
+ else
+ {
+ ListCell *lc;
+
+ foreach(lc, stmt->attlist)
+ {
+ /*
+ * Build the ColumnRef for each column. The ColumnRef
+ * 'fields' property is a String 'Value' node (see
+ * nodes/value.h) that corresponds to the column name
+ * respectively.
+ */
+ cr = makeNode(ColumnRef);
+ cr->fields = list_make1(lfirst(lc));
+ cr->location = -1;
+
+ /* Build the ResTarget and add the ColumnRef to it. */
+ target = makeNode(ResTarget);
+ target->name = NULL;
+ target->indirection = NIL;
+ target->val = (Node *) cr;
+ target->location = -1;
+
+ /* Add each column to the SELECT statement's target list */
+ targetList = lappend(targetList, target);
+ }
+ }
+
+ /*
+ * Build RangeVar for from clause, fully qualified based on the
+ * relation which we have opened and locked.
+ */
+ from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
+ pstrdup(RelationGetRelationName(rel)),
+ -1);
+
+ /* Build query */
+ select = makeNode(SelectStmt);
+ select->targetList = targetList;
+ select->fromClause = list_make1(from);
+
+ query = makeNode(RawStmt);
+ query->stmt = (Node *) select;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
+
+ /*
+ * Close the relation for now, but keep the lock on it to prevent
+ * changes between now and when we start the query-based COPY.
+ *
+ * We'll reopen it later as part of the query-based COPY.
+ */
+ heap_close(rel, NoLock);
+ rel = NULL;
+ }
}
else
{
Assert(stmt->query);
+ query = makeNode(RawStmt);
+ query->stmt = stmt->query;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
+
+ relid = InvalidOid;
rel = NULL;
}
if (is_from)
{
- /* check read-only transaction */
- if (XactReadOnly && rel->rd_backend != MyBackendId)
+ Assert(rel);
+
+ /* check read-only transaction and parallel mode */
+ if (XactReadOnly && !rel->rd_islocaltemp)
PreventCommandIfReadOnly("COPY FROM");
+ PreventCommandIfParallelMode("COPY FROM");
- cstate = BeginCopyFrom(rel, stmt->filename,
- stmt->attlist, stmt->options);
- processed = CopyFrom(cstate); /* copy from file to database */
+ cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
+ NULL, stmt->attlist, stmt->options);
+ *processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
else
{
- cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+ cstate = BeginCopyTo(pstate, rel, query, relid,
+ stmt->filename, stmt->is_program,
stmt->attlist, stmt->options);
- processed = DoCopyTo(cstate); /* copy from database to file */
+ *processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
*/
if (rel != NULL)
heap_close(rel, (is_from ? NoLock : AccessShareLock));
-
- return processed;
}
/*
* self-consistency of the options list.
*/
void
-ProcessCopyOptions(CopyState cstate,
+ProcessCopyOptions(ParseState *pstate,
+ CopyState cstate,
bool is_from,
List *options)
{
/* Extract options from the statement node tree */
foreach(option, options)
{
- DefElem *defel = (DefElem *) lfirst(option);
+ DefElem *defel = lfirst_node(DefElem, option);
if (strcmp(defel->defname, "format") == 0)
{
if (format_specified)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
format_specified = true;
if (strcmp(fmt, "text") == 0)
/* default format */ ;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("COPY format \"%s\" not recognized", fmt)));
+ errmsg("COPY format \"%s\" not recognized", fmt),
+ parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "oids") == 0)
{
if (cstate->oids)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->oids = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "freeze") == 0)
+ {
+ if (cstate->freeze)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->freeze = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (cstate->delim)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->delim = defGetString(defel);
}
else if (strcmp(defel->defname, "null") == 0)
if (cstate->null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->null_print = defGetString(defel);
}
else if (strcmp(defel->defname, "header") == 0)
if (cstate->header_line)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->header_line = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "quote") == 0)
if (cstate->quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->quote = defGetString(defel);
}
else if (strcmp(defel->defname, "escape") == 0)
if (cstate->escape)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->escape = defGetString(defel);
}
else if (strcmp(defel->defname, "force_quote") == 0)
if (cstate->force_quote || cstate->force_quote_all)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
if (defel->arg && IsA(defel->arg, A_Star))
cstate->force_quote_all = true;
else if (defel->arg && IsA(defel->arg, List))
- cstate->force_quote = (List *) defel->arg;
+ cstate->force_quote = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
- defel->defname)));
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "force_not_null") == 0)
{
if (cstate->force_notnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ if (defel->arg && IsA(defel->arg, List))
+ cstate->force_notnull = castNode(List, defel->arg);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a list of column names",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "force_null") == 0)
+ {
+ if (cstate->force_null)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
if (defel->arg && IsA(defel->arg, List))
- cstate->force_notnull = (List *) defel->arg;
+ cstate->force_null = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
- defel->defname)));
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "convert_selectively") == 0)
+ {
+ /*
+ * Undocumented, not-accessible-from-SQL option: convert only the
+ * named columns to binary form, storing the rest as NULLs. It's
+ * allowed for the column list to be NIL.
+ */
+ if (cstate->convert_selectively)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->convert_selectively = true;
+ if (defel->arg == NULL || IsA(defel->arg, List))
+ cstate->convert_select = castNode(List, defel->arg);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a list of column names",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "encoding") == 0)
{
if (cstate->file_encoding >= 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
if (cstate->file_encoding < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a valid encoding name",
- defel->defname)));
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("option \"%s\" not recognized",
- defel->defname)));
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
}
/*
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force not null only available using COPY FROM")));
+ /* Check force_null */
+ if (!cstate->csv_mode && cstate->force_null != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force null available only in CSV mode")));
+
+ if (cstate->force_null != NIL && !is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force null only available using COPY FROM")));
+
/* Don't allow the delimiter to appear in the null string. */
if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
ereport(ERROR,
* NULL values as <null_print>.
*/
static CopyState
-BeginCopy(bool is_from,
+BeginCopy(ParseState *pstate,
+ bool is_from,
Relation rel,
- Node *raw_query,
- const char *queryString,
+ RawStmt *raw_query,
+ Oid queryRelId,
List *attnamelist,
List *options)
{
*/
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ALLOCSET_DEFAULT_SIZES);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Extract options from the statement node tree */
- ProcessCopyOptions(cstate, is_from, options);
+ ProcessCopyOptions(pstate, cstate, is_from, options);
/* Process the source/target relation or query */
if (rel)
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
+
+ /* Initialize state for CopyFrom tuple routing. */
+ if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDispatch *partition_dispatch_info;
+ ResultRelInfo *partitions;
+ TupleConversionMap **partition_tupconv_maps;
+ TupleTableSlot *partition_tuple_slot;
+ int num_parted,
+ num_partitions;
+
+ ExecSetupPartitionTupleRouting(rel,
+ &partition_dispatch_info,
+ &partitions,
+ &partition_tupconv_maps,
+ &partition_tuple_slot,
+ &num_parted, &num_partitions);
+ cstate->partition_dispatch_info = partition_dispatch_info;
+ cstate->num_dispatch = num_parted;
+ cstate->partitions = partitions;
+ cstate->num_partitions = num_partitions;
+ cstate->partition_tupconv_maps = partition_tupconv_maps;
+ cstate->partition_tuple_slot = partition_tuple_slot;
+ }
}
else
{
Assert(!is_from);
cstate->rel = NULL;
- /* Don't allow COPY w/ OIDs from a select */
+ /* Don't allow COPY w/ OIDs from a query */
if (cstate->oids)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("COPY (SELECT) WITH OIDS is not supported")));
+ errmsg("COPY (query) WITH OIDS is not supported")));
/*
- * Run parse analysis and rewrite. Note this also acquires sufficient
+ * Run parse analysis and rewrite. Note this also acquires sufficient
* locks on the source table(s).
*
* Because the parser and planner tend to scribble on their input, we
* function and is executed repeatedly. (See also the same hack in
* DECLARE CURSOR and PREPARE.) XXX FIXME someday.
*/
- rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
- queryString, NULL, 0);
+ rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+ pstate->p_sourcetext, NULL, 0,
+ NULL);
- /* We don't expect more or less than one result query */
- if (list_length(rewritten) != 1)
- elog(ERROR, "unexpected rewrite result");
+ /* check that we got back something we can work with */
+ if (rewritten == NIL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
+ }
+ else if (list_length(rewritten) > 1)
+ {
+ ListCell *lc;
- query = (Query *) linitial(rewritten);
- Assert(query->commandType == CMD_SELECT);
- Assert(query->utilityStmt == NULL);
+ /* examine queries to determine which error message to issue */
+ foreach(lc, rewritten)
+ {
+ Query *q = lfirst_node(Query, lc);
+
+ if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("conditional DO INSTEAD rules are not supported for COPY")));
+ if (q->querySource == QSRC_NON_INSTEAD_RULE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DO ALSO rules are not supported for the COPY")));
+ }
- /* Query mustn't use INTO, either */
- if (query->intoClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
+ }
+
+ query = linitial_node(Query, rewritten);
+
+ /* The grammar allows SELECT INTO, but we don't support that */
+ if (query->utilityStmt != NULL &&
+ IsA(query->utilityStmt, CreateTableAsStmt))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT INTO) is not supported")));
+ Assert(query->utilityStmt == NULL);
+
+ /*
+ * Similarly the grammar doesn't enforce the presence of a RETURNING
+ * clause, but this is required here.
+ */
+ if (query->commandType != CMD_SELECT &&
+ query->returningList == NIL)
+ {
+ Assert(query->commandType == CMD_INSERT ||
+ query->commandType == CMD_UPDATE ||
+ query->commandType == CMD_DELETE);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY query must have a RETURNING clause")));
+ }
+
/* plan the query */
- plan = planner(query, 0, NULL);
+ plan = pg_plan_query(query, 0, NULL);
+
+ /*
+ * With row level security and a user using "COPY relation TO", we
+ * have to convert the "COPY relation TO" to a query-based COPY (eg:
+ * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
+ * in any RLS clauses.
+ *
+ * When this happens, we are passed in the relid of the originally
+ * found relation (which we have locked). As the planner will look up
+ * the relation again, we double-check here to make sure it found the
+ * same one that we have locked.
+ */
+ if (queryRelId != InvalidOid)
+ {
+ /*
+ * Note that with RLS involved there may be multiple relations,
+ * and while the one we need is almost certainly first, we don't
+ * make any guarantees of that in the planner, so check the whole
+ * list and make sure we find the original relation.
+ */
+ if (!list_member_oid(plan->relationOids, queryRelId))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("relation referenced by COPY statement has changed")));
+ }
/*
* Use a snapshot with an updated command ID to ensure this query sees
((DR_copy *) dest)->cstate = cstate;
/* Create a QueryDesc requesting no output */
- cstate->queryDesc = CreateQueryDesc(plan, queryString,
+ cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
GetActiveSnapshot(),
InvalidSnapshot,
- dest, NULL, 0);
+ dest, NULL, NULL, 0);
/*
* Call ExecutorStart to prepare the plan for execution.
num_phys_attrs = tupDesc->natts;
- /* Convert FORCE QUOTE name list to per-column flags, check validity */
+ /* Convert FORCE_QUOTE name list to per-column flags, check validity */
cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->force_quote_all)
{
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
+ errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
NameStr(tupDesc->attrs[attnum - 1]->attname))));
cstate->force_quote_flags[attnum - 1] = true;
}
}
- /* Convert FORCE NOT NULL name list to per-column flags, check validity */
+ /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (cstate->force_notnull)
{
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
+ errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
NameStr(tupDesc->attrs[attnum - 1]->attname))));
cstate->force_notnull_flags[attnum - 1] = true;
}
}
+ /* Convert FORCE_NULL name list to per-column flags, check validity */
+ cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
+ if (cstate->force_null)
+ {
+ List *attnums;
+ ListCell *cur;
+
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
+
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+
+ if (!list_member_int(cstate->attnumlist, attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
+ NameStr(tupDesc->attrs[attnum - 1]->attname))));
+ cstate->force_null_flags[attnum - 1] = true;
+ }
+ }
+
+ /* Convert convert_selectively name list to per-column flags */
+ if (cstate->convert_selectively)
+ {
+ List *attnums;
+ ListCell *cur;
+
+ cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
+
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
+
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+
+ if (!list_member_int(cstate->attnumlist, attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg_internal("selected column \"%s\" not referenced by COPY",
+ NameStr(tupDesc->attrs[attnum - 1]->attname))));
+ cstate->convert_select_flags[attnum - 1] = true;
+ }
+ }
+
/* Use client encoding when ENCODING option is not specified. */
if (cstate->file_encoding < 0)
cstate->file_encoding = pg_get_client_encoding();
}
/*
- * Release resources allocated in a cstate for COPY TO/FROM.
+ * Closes the pipe to an external program, checking the pclose() return code.
*/
static void
-EndCopy(CopyState cstate)
+ClosePipeToProgram(CopyState cstate)
{
- if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+ int pclose_rc;
+
+ Assert(cstate->is_program);
+
+ pclose_rc = ClosePipeStream(cstate->copy_file);
+ if (pclose_rc == -1)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not close file \"%s\": %m",
- cstate->filename)));
+ errmsg("could not close pipe to external command: %m")));
+ else if (pclose_rc != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("program \"%s\" failed",
+ cstate->filename),
+ errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+}
+
+/*
+ * Release resources allocated in a cstate for COPY TO/FROM.
+ */
+static void
+EndCopy(CopyState cstate)
+{
+ if (cstate->is_program)
+ {
+ ClosePipeToProgram(cstate);
+ }
+ else
+ {
+ if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->filename)));
+ }
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
* Setup CopyState to read tuples from a table or a query for COPY TO.
*/
static CopyState
-BeginCopyTo(Relation rel,
- Node *query,
- const char *queryString,
+BeginCopyTo(ParseState *pstate,
+ Relation rel,
+ RawStmt *query,
+ Oid queryRelId,
const char *filename,
+ bool is_program,
List *attnamelist,
List *options)
{
errmsg("cannot copy from view \"%s\"",
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
+ else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from materialized view \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from sequence \"%s\"",
RelationGetRelationName(rel))));
+ else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from partitioned table \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
RelationGetRelationName(rel))));
}
- cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
+ cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
+ options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
if (pipe)
{
+ Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
else
{
- mode_t oumask; /* Pre-existing umask value */
- struct stat st;
+ cstate->filename = pstrdup(filename);
+ cstate->is_program = is_program;
- /*
- * Prevent write to relative path ... too easy to shoot oneself in the
- * foot by overwriting a database file ...
- */
- if (!is_absolute_path(filename))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_NAME),
- errmsg("relative path not allowed for COPY to file")));
+ if (is_program)
+ {
+ cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not execute command \"%s\": %m",
+ cstate->filename)));
+ }
+ else
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
- cstate->filename = pstrdup(filename);
- oumask = umask(S_IWGRP | S_IWOTH);
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
- umask(oumask);
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for COPY to file")));
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for writing: %m",
- cstate->filename)));
+ oumask = umask(S_IWGRP | S_IWOTH);
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+ umask(oumask);
+ if (cstate->copy_file == NULL)
+ {
+ /* copy errno because ereport subfunctions might change it */
+ int save_errno = errno;
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->filename),
+ (save_errno == ENOENT || save_errno == EACCES) ?
+ errhint("COPY TO instructs the PostgreSQL server process to write a file. "
+ "You may want a client-side facility such as psql's \\copy.") : 0));
+ }
+
+ if (fstat(fileno(cstate->copy_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
}
MemoryContextSwitchTo(oldcontext);
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype output routines, and should be faster than retail pfree's
- * anyway. (We don't need a whole econtext as CopyFrom does.)
+ * anyway. (We don't need a whole econtext as CopyFrom does.)
*/
cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ALLOCSET_DEFAULT_SIZES);
if (cstate->binary)
{
int32 tmp;
/* Signature */
- CopySendData(cstate, (char *) BinarySignature, 11);
+ CopySendData(cstate, BinarySignature, 11);
/* Flags field */
tmp = 0;
if (cstate->oids)
else
{
/* run the plan --- the dest receiver will send tuples */
- ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
}
else
{
- /* error is relevant to a particular line */
- if (cstate->line_buf_converted || !cstate->need_transcoding)
+ /*
+ * Error is relevant to a particular line.
+ *
+ * If line_buf still contains the correct line, and it's already
+ * transcoded, print it. If it's still in a foreign encoding, it's
+ * quite likely that the error is precisely a failure to do
+ * encoding conversion (ie, bad data). We dare not try to convert
+ * it, and at present there's no way to regurgitate it without
+ * conversion. So we have to punt and just report the line number.
+ */
+ if (cstate->line_buf_valid &&
+ (cstate->line_buf_converted || !cstate->need_transcoding))
{
char *lineval;
}
else
{
- /*
- * Here, the line buffer is still in a foreign encoding, and
- * indeed it's quite likely that the error is precisely a
- * failure to do encoding conversion (ie, bad data). We dare
- * not try to convert it, and at present there's no way to
- * regurgitate it without conversion. So we have to punt and
- * just report the line number.
- */
errcontext("COPY %s, line %d",
cstate->cur_relname, cstate->cur_lineno);
}
/*
* Copy FROM file to relation.
*/
-static uint64
+uint64
CopyFrom(CopyState cstate)
{
HeapTuple tuple;
Datum *values;
bool *nulls;
ResultRelInfo *resultRelInfo;
+ ResultRelInfo *saved_resultRelInfo = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ExprContext *econtext;
TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext;
- ErrorContextCallback errcontext;
+
+ ErrorContextCallback errcallback;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
uint64 processed = 0;
+ bool useHeapMultiInsert;
+ int nBufferedTuples = 0;
+ int prev_leaf_part_index = -1;
+
+#define MAX_BUFFERED_TUPLES 1000
+ HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
+ Size bufferedTuplesSize = 0;
+ int firstBufferedLineNo = 0;
Assert(cstate->rel);
- if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
+ /*
+ * The target must be a plain relation or have an INSTEAD OF INSERT row
+ * trigger. (Currently, such triggers are only allowed on views, so we
+ * only hint about them in the view case.)
+ */
+ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+ cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+ !(cstate->rel->trigdesc &&
+ cstate->rel->trigdesc->trig_insert_instead_row))
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
+ RelationGetRelationName(cstate->rel)),
+ errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
+ else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy to materialized view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
* routine first.
*
* As mentioned in comments in utils/rel.h, the in-same-transaction test
- * is not completely reliable, since in rare cases rd_createSubid or
- * rd_newRelfilenodeSubid can be cleared before the end of the transaction.
- * However this is OK since at worst we will fail to make the optimization.
+ * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
+ * can be cleared before the end of the transaction. The exact case is
+ * when a relation sets a new relfilenode twice in same transaction, yet
+ * the second one fails in an aborted subtransaction, e.g.
+ *
+ * BEGIN;
+ * TRUNCATE t;
+ * SAVEPOINT save;
+ * TRUNCATE t;
+ * ROLLBACK TO save;
+ * COPY ...
*
* Also, if the target file is new-in-transaction, we assume that checking
* FSM for free space is a waste of time, even if we must use WAL because
* no additional work to enforce that.
*----------
*/
+ /* createSubid is creation check, newRelfilenodeSubid is truncation check */
if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
{
hi_options |= HEAP_INSERT_SKIP_WAL;
}
+ /*
+ * Optimize if new relfilenode was created in this subxact or one of its
+ * committed children and we won't see those rows later as part of an
+ * earlier scan or command. This ensures that if this subtransaction
+ * aborts then the frozen rows won't be visible after xact cleanup. Note
+ * that the stronger test of exactly which subtransaction created it is
+ * crucial for correctness of this optimization.
+ */
+ if (cstate->freeze)
+ {
+ if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("cannot perform FREEZE because of prior transaction activity")));
+
+ if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
+ cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
+
+ hi_options |= HEAP_INSERT_FROZEN;
+ }
+
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
* here that basically duplicated execUtils.c ...)
*/
resultRelInfo = makeNode(ResultRelInfo);
- resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
- resultRelInfo->ri_RelationDesc = cstate->rel;
- resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
- if (resultRelInfo->ri_TrigDesc)
- {
- resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
- palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
- resultRelInfo->ri_TrigWhenExprs = (List **)
- palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(List *));
- }
- resultRelInfo->ri_TrigInstrument = NULL;
+ InitResultRelInfo(resultRelInfo,
+ cstate->rel,
+ 1, /* dummy rangetable index */
+ NULL,
+ 0);
- ExecOpenIndices(resultRelInfo);
+ ExecOpenIndices(resultRelInfo, false);
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = resultRelInfo;
+ estate->es_range_table = cstate->range_table;
/* Set up a tuple slot too */
myslot = ExecInitExtraTupleSlot(estate);
/* Triggers might need a slot as well */
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+ /*
+ * It's more efficient to prepare a bunch of tuples for insertion, and
+ * insert them in one heap_multi_insert() call, than call heap_insert()
+ * separately for every tuple. However, we can't do that if there are
+ * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+ * expressions. Such triggers or expressions might query the table we're
+ * inserting to, and act differently if the tuples that have already been
+ * processed and prepared for insertion are not there. We also can't do
+ * it if the table is partitioned.
+ */
+ if ((resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->partition_dispatch_info != NULL ||
+ cstate->volatile_defexprs)
+ {
+ useHeapMultiInsert = false;
+ }
+ else
+ {
+ useHeapMultiInsert = true;
+ bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ }
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
/*
- * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
+ * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
* should do this for COPY, since it's not really an "INSERT" statement as
* such. However, executing these triggers maintains consistency with the
* EACH ROW triggers that we already fire on COPY.
econtext = GetPerTupleExprContext(estate);
/* Set up callback to identify error line number */
- errcontext.callback = CopyFromErrorCallback;
- errcontext.arg = (void *) cstate;
- errcontext.previous = error_context_stack;
- error_context_stack = &errcontext;
+ errcallback.callback = CopyFromErrorCallback;
+ errcallback.arg = (void *) cstate;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
for (;;)
{
CHECK_FOR_INTERRUPTS();
- /* Reset the per-tuple exprcontext */
- ResetPerTupleExprContext(estate);
+ if (nBufferedTuples == 0)
+ {
+ /*
+ * Reset the per-tuple exprcontext. We can only do this if the
+ * tuple buffer is empty. (Calling the context the per-tuple
+ * memory context is a bit of a misnomer now.)
+ */
+ ResetPerTupleExprContext(estate);
+ }
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
if (loaded_oid != InvalidOid)
HeapTupleSetOid(tuple, loaded_oid);
+ /*
+ * Constraints might reference the tableoid column, so initialize
+ * t_tableOid before evaluating them.
+ */
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);
slot = myslot;
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ /* Determine the partition to heap_insert the tuple into */
+ if (cstate->partition_dispatch_info)
+ {
+ int leaf_part_index;
+ TupleConversionMap *map;
+
+ /*
+ * Away we go ... If we end up not finding a partition after all,
+ * ExecFindPartition() does not return and errors out instead.
+ * Otherwise, the returned value is to be used as an index into
+ * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+ * will get us the ResultRelInfo and TupleConversionMap for the
+ * partition, respectively.
+ */
+ leaf_part_index = ExecFindPartition(resultRelInfo,
+ cstate->partition_dispatch_info,
+ slot,
+ estate);
+ Assert(leaf_part_index >= 0 &&
+ leaf_part_index < cstate->num_partitions);
+
+ /*
+ * If this tuple is mapped to a partition that is not same as the
+ * previous one, we'd better make the bulk insert mechanism gets a
+ * new buffer.
+ */
+ if (prev_leaf_part_index != leaf_part_index)
+ {
+ ReleaseBulkInsertStatePin(bistate);
+ prev_leaf_part_index = leaf_part_index;
+ }
+
+ /*
+ * Save the old ResultRelInfo and switch to the one corresponding
+ * to the selected partition.
+ */
+ saved_resultRelInfo = resultRelInfo;
+ resultRelInfo = cstate->partitions + leaf_part_index;
+
+ /* We do not yet have a way to insert into a foreign partition */
+ if (resultRelInfo->ri_FdwRoutine)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot route inserted tuples to a foreign table")));
+
+ /*
+ * For ExecInsertIndexTuples() to work on the partition's indexes
+ */
+ estate->es_result_relation_info = resultRelInfo;
+
+ /*
+ * We might need to convert from the parent rowtype to the
+ * partition rowtype.
+ */
+ map = cstate->partition_tupconv_maps[leaf_part_index];
+ if (map)
+ {
+ Relation partrel = resultRelInfo->ri_RelationDesc;
+
+ tuple = do_convert_tuple(tuple, map);
+
+ /*
+ * We must use the partition's tuple descriptor from this
+ * point on. Use a dedicated slot from this point on until
+ * we're finished dealing with the partition.
+ */
+ slot = cstate->partition_tuple_slot;
+ Assert(slot != NULL);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+ ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+ }
+
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ }
+
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (!skip_tuple)
{
- List *recheckIndexes = NIL;
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
+ {
+ /* Pass the data to the INSTEAD ROW INSERT trigger */
+ ExecIRInsertTriggers(estate, resultRelInfo, slot);
+ }
+ else
+ {
+ /* Check the constraints of the tuple */
+ if (cstate->rel->rd_att->constr ||
+ resultRelInfo->ri_PartitionCheck)
+ ExecConstraints(resultRelInfo, slot, estate);
- /* Check the constraints of the tuple */
- if (cstate->rel->rd_att->constr)
- ExecConstraints(resultRelInfo, slot, estate);
+ if (useHeapMultiInsert)
+ {
+ /* Add this tuple to the tuple buffer */
+ if (nBufferedTuples == 0)
+ firstBufferedLineNo = cstate->cur_lineno;
+ bufferedTuples[nBufferedTuples++] = tuple;
+ bufferedTuplesSize += tuple->t_len;
- /* OK, store the tuple and create index entries for it */
- heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+ /*
+ * If the buffer filled up, flush it. Also flush if the
+ * total size of all the tuples in the buffer becomes
+ * large, to avoid using large amounts of memory for the
+ * buffer when the tuples are exceptionally wide.
+ */
+ if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ bufferedTuplesSize > 65535)
+ {
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
+ nBufferedTuples = 0;
+ bufferedTuplesSize = 0;
+ }
+ }
+ else
+ {
+ List *recheckIndexes = NIL;
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate);
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+ hi_options, bistate);
- /* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, tuple,
- recheckIndexes);
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot,
+ &(tuple->t_self),
+ estate,
+ false,
+ NULL,
+ NIL);
- list_free(recheckIndexes);
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes);
+
+ list_free(recheckIndexes);
+ }
+ }
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
* tuples inserted by an INSERT command.
*/
processed++;
+
+ if (saved_resultRelInfo)
+ {
+ resultRelInfo = saved_resultRelInfo;
+ estate->es_result_relation_info = resultRelInfo;
+ }
}
}
+ /* Flush any remaining buffered tuples */
+ if (nBufferedTuples > 0)
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
+
/* Done, clean up */
- error_context_stack = errcontext.previous;
+ error_context_stack = errcallback.previous;
FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
+ /*
+ * In the old protocol, tell pqcomm that we can process normal protocol
+ * messages again.
+ */
+ if (cstate->copy_dest == COPY_OLD_FE)
+ pq_endmsgread();
+
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
ExecCloseIndices(resultRelInfo);
+ /* Close all the partitioned tables, leaf partitions, and their indices */
+ if (cstate->partition_dispatch_info)
+ {
+ int i;
+
+ /*
+ * Remember cstate->partition_dispatch_info[0] corresponds to the root
+ * partitioned table, which we must not try to close, because it is
+ * the main target table of COPY that will be closed eventually by
+ * DoCopy(). Also, tupslot is NULL for the root partitioned table.
+ */
+ for (i = 1; i < cstate->num_dispatch; i++)
+ {
+ PartitionDispatch pd = cstate->partition_dispatch_info[i];
+
+ heap_close(pd->reldesc, NoLock);
+ ExecDropSingleTupleTableSlot(pd->tupslot);
+ }
+ for (i = 0; i < cstate->num_partitions; i++)
+ {
+ ResultRelInfo *resultRelInfo = cstate->partitions + i;
+
+ ExecCloseIndices(resultRelInfo);
+ heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+ }
+
+ /* Release the standalone partition tuple descriptor */
+ ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
+ }
+
+ /* Close any trigger target relations */
+ ExecCleanUpTriggerState(estate);
+
FreeExecutorState(estate);
/*
return processed;
}
+/*
+ * A subroutine of CopyFrom, to write the current batch of buffered heap
+ * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
+ * triggers.
+ */
+static void
+CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
+ int hi_options, ResultRelInfo *resultRelInfo,
+ TupleTableSlot *myslot, BulkInsertState bistate,
+ int nBufferedTuples, HeapTuple *bufferedTuples,
+ int firstBufferedLineNo)
+{
+ MemoryContext oldcontext;
+ int i;
+ int save_cur_lineno;
+
+ /*
+ * Print error context information correctly, if one of the operations
+ * below fail.
+ */
+ cstate->line_buf_valid = false;
+ save_cur_lineno = cstate->cur_lineno;
+
+ /*
+ * heap_multi_insert leaks memory, so switch to short-lived memory context
+ * before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ heap_multi_insert(cstate->rel,
+ bufferedTuples,
+ nBufferedTuples,
+ mycid,
+ hi_options,
+ bistate);
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples, and
+ * run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ for (i = 0; i < nBufferedTuples; i++)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = firstBufferedLineNo + i;
+ ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+ recheckIndexes =
+ ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+ estate, false, NULL, NIL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ bufferedTuples[i],
+ recheckIndexes);
+ list_free(recheckIndexes);
+ }
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ * anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+ {
+ for (i = 0; i < nBufferedTuples; i++)
+ {
+ cstate->cur_lineno = firstBufferedLineNo + i;
+ ExecARInsertTriggers(estate, resultRelInfo,
+ bufferedTuples[i],
+ NIL);
+ }
+ }
+
+ /* reset cur_lineno to where we were */
+ cstate->cur_lineno = save_cur_lineno;
+}
+
/*
* Setup to read tuples from a file for COPY FROM.
*
* Returns a CopyState, to be passed to NextCopyFrom and related functions.
*/
CopyState
-BeginCopyFrom(Relation rel,
+BeginCopyFrom(ParseState *pstate,
+ Relation rel,
const char *filename,
+ bool is_program,
+ copy_data_source_cb data_source_cb,
List *attnamelist,
List *options)
{
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
+ bool volatile_defexprs;
- cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
+ cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Initialize state variables */
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
+ /* Assign range table, we'll need it in CopyFrom. */
+ if (pstate)
+ cstate->range_table = pstate->p_rtable;
+
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
num_defaults = 0;
+ volatile_defexprs = false;
/*
* Pick up the required catalog information for each attribute in the
{
/* attribute is NOT to be copied from input */
/* use default value if one exists */
- Node *defexpr = build_column_default(cstate->rel, attnum);
+ Expr *defexpr = (Expr *) build_column_default(cstate->rel,
+ attnum);
if (defexpr != NULL)
{
- /* Initialize expressions in copycontext. */
- defexprs[num_defaults] = ExecInitExpr(
- expression_planner((Expr *) defexpr), NULL);
+ /* Run the expression through planner */
+ defexpr = expression_planner(defexpr);
+
+ /* Initialize executable expression in copycontext */
+ defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
+
+ /*
+ * If a default expression looks at the table being loaded,
+ * then it could give the wrong answer when using
+ * multi-insert. Since database access can be dynamic this is
+ * hard to test for exactly, so we use the much wider test of
+ * whether the default expression is volatile. We allow for
+ * the special case of when the default expression is the
+ * nextval() of a sequence which in this specific case is
+ * known to be safe for use with the multi-insert
+ * optimization. Hence we use this special case function
+ * checker rather than the standard check for
+ * contain_volatile_functions().
+ */
+ if (!volatile_defexprs)
+ volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
}
}
}
cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
+ cstate->volatile_defexprs = volatile_defexprs;
cstate->num_defaults = num_defaults;
+ cstate->is_program = is_program;
- if (pipe)
+ if (data_source_cb)
{
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_source_cb = data_source_cb;
+ }
+ else if (pipe)
+ {
+ Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
}
else
{
- struct stat st;
-
cstate->filename = pstrdup(filename);
- cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
- if (cstate->copy_file == NULL)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename)));
+ if (cstate->is_program)
+ {
+ cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not execute command \"%s\": %m",
+ cstate->filename)));
+ }
+ else
+ {
+ struct stat st;
- fstat(fileno(cstate->copy_file), &st);
- if (S_ISDIR(st.st_mode))
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is a directory", cstate->filename)));
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+ if (cstate->copy_file == NULL)
+ {
+ /* copy errno because ereport subfunctions might change it */
+ int save_errno = errno;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for reading: %m",
+ cstate->filename),
+ (save_errno == ENOENT || save_errno == EACCES) ?
+ errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
+ "You may want a client-side facility such as psql's \\copy.") : 0));
+ }
+
+ if (fstat(fileno(cstate->copy_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
}
if (!cstate->binary)
NameStr(attr[m]->attname))));
string = field_strings[fieldno++];
- if (cstate->csv_mode && string == NULL &&
- cstate->force_notnull_flags[m])
+ if (cstate->convert_select_flags &&
+ !cstate->convert_select_flags[m])
+ {
+ /* ignore input field, leaving column as NULL */
+ continue;
+ }
+
+ if (cstate->csv_mode)
{
- /* Go ahead and read the NULL string */
- string = cstate->null_print;
+ if (string == NULL &&
+ cstate->force_notnull_flags[m])
+ {
+ /*
+ * FORCE_NOT_NULL option is set and column is NULL -
+ * convert it to the NULL string.
+ */
+ string = cstate->null_print;
+ }
+ else if (string != NULL && cstate->force_null_flags[m]
+ && strcmp(string, cstate->null_print) == 0)
+ {
+ /*
+ * FORCE_NULL option is set and column matches the NULL
+ * string. It must have been quoted, or otherwise the
+ * string would already have been set to NULL. Convert it
+ * to NULL as specified.
+ */
+ string = NULL;
+ }
}
cstate->cur_attname = NameStr(attr[m]->attname);
* if client chooses to send that now.
*
* Note that we MUST NOT try to read more data in an old-protocol
- * copy, since there is no protocol-level EOF marker then. We
+ * copy, since there is no protocol-level EOF marker then. We
* could go either way for copy from file, but choose to throw
* error if there's data after the EOF marker, for consistency
* with the new-protocol case.
/*
* Now compute and insert any defaults available for the columns not
- * provided by the input data. Anything not processed here or above will
+ * provided by the input data. Anything not processed here or above will
* remain NULL.
*/
for (i = 0; i < num_defaults; i++)
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
- &nulls[defmap[i]], NULL);
+ &nulls[defmap[i]]);
}
return true;
* server encoding.
*
* Result is true if read was terminated by EOF, false if terminated
- * by newline. The terminating newline or EOF marker is not included
+ * by newline. The terminating newline or EOF marker is not included
* in the final value of line_buf.
*/
static bool
bool result;
resetStringInfo(&cstate->line_buf);
+ cstate->line_buf_valid = true;
/* Mark that encoding conversion hasn't occurred yet */
cstate->line_buf_converted = false;
* of read-ahead and avoid the many calls to
* IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
* does not allow us to read too far ahead or we might read into the
- * next data, so we read-ahead only as far we know we can. One
+ * next data, so we read-ahead only as far we know we can. One
* optimization would be to read-ahead four byte here if
* cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
* considering the size of the buffer.
REFILL_LINEBUF;
/*
- * Try to read some more data. This will certainly reset
+ * Try to read some more data. This will certainly reset
* raw_buf_index to zero, and raw_buf_ptr must go with it.
*/
if (!CopyLoadRawBuf(cstate))
* just use the char as a toggle. If they are different, we need
* to ensure that we only take account of an escape inside a
* quoted field and immediately preceding a quote char, and not
- * the second in a escape-escape sequence.
+ * the second in an escape-escape sequence.
*/
if (in_quote && c == escapec)
last_was_esc = !last_was_esc;
/*
* Updating the line count for embedded CR and/or LF chars is
* necessarily a little fragile - this test is probably about the
- * best we can do. (XXX it's arguable whether we should do this
+ * best we can do. (XXX it's arguable whether we should do this
* at all --- is cur_lineno a physical or logical count?)
*/
if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
* after a backslash is special, so we skip over that second
* character too. If we didn't do that \\. would be
* considered an eof-of copy, while in non-CSV mode it is a
- * literal backslash followed by a period. In CSV mode,
+ * literal backslash followed by a period. In CSV mode,
* backslashes are not special, so we want to process the
* character after the backslash just like a normal character,
* so we don't increment in those cases.
/*
* The de-escaped attributes will certainly not be longer than the input
* data line, so we can just force attribute_buf to be large enough and
- * then transfer data without any checks for enough space. We need to do
+ * then transfer data without any checks for enough space. We need to do
* it this way because enlarging attribute_buf mid-stream would invalidate
* pointers already stored into cstate->raw_fields[].
*/
start_ptr = cur_ptr;
cstate->raw_fields[fieldno] = output_ptr;
- /* Scan data for field */
+ /*
+ * Scan data for field.
+ *
+ * Note that in this loop, we are scanning to locate the end of field
+ * and also speculatively performing de-escaping. Once we find the
+ * end-of-field, we can match the raw field contents against the null
+ * marker string. Only after that comparison fails do we know that
+ * de-escaping is actually the right thing to do; therefore we *must
+ * not* throw any syntax errors before we've done the null-marker
+ * check.
+ */
for (;;)
{
char c;
*output_ptr++ = c;
}
- /* Terminate attribute value in output area */
- *output_ptr++ = '\0';
-
- /*
- * If we de-escaped a non-7-bit-ASCII char, make sure we still have
- * valid data for the db encoding. Avoid calling strlen here for the
- * sake of efficiency.
- */
- if (saw_non_ascii)
- {
- char *fld = cstate->raw_fields[fieldno];
-
- pg_verifymbstr(fld, output_ptr - (fld + 1), false);
- }
-
/* Check whether raw input matched null marker */
input_len = end_ptr - start_ptr;
if (input_len == cstate->null_print_len &&
strncmp(start_ptr, cstate->null_print, input_len) == 0)
cstate->raw_fields[fieldno] = NULL;
+ else
+ {
+ /*
+ * At this point we know the field is supposed to contain data.
+ *
+ * If we de-escaped any non-7-bit-ASCII chars, make sure the
+ * resulting string is valid data for the db encoding.
+ */
+ if (saw_non_ascii)
+ {
+ char *fld = cstate->raw_fields[fieldno];
+
+ pg_verifymbstr(fld, output_ptr - fld, false);
+ }
+ }
+
+ /* Terminate attribute value in output area */
+ *output_ptr++ = '\0';
fieldno++;
/* Done if we hit EOL instead of a delim */
/*
* The de-escaped attributes will certainly not be longer than the input
* data line, so we can just force attribute_buf to be large enough and
- * then transfer data without any checks for enough space. We need to do
+ * then transfer data without any checks for enough space. We need to do
* it this way because enlarging attribute_buf mid-stream would invalidate
* pointers already stored into cstate->raw_fields[].
*/
/*
* We have to grovel through the string searching for control characters
* and instances of the delimiter character. In most cases, though, these
- * are infrequent. To avoid overhead from calling CopySendData once per
+ * are infrequent. To avoid overhead from calling CopySendData once per
* character, we dump out all characters between escaped characters in a
* single call. The loop invariant is that the data from "start" to "ptr"
* can be sent literally, but hasn't yet been.
/*
* copy_dest_receive --- receive one tuple
*/
-static void
+static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
myState->processed++;
+
+ return true;
}
/*