1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/commands/copy.c
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "rewrite/rewriteHandler.h"
41 #include "storage/fd.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/portal.h"
47 #include "utils/rel.h"
48 #include "utils/rls.h"
49 #include "utils/snapmgr.h"
52 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
53 #define OCTVALUE(c) ((c) - '0')
56 * Represents the different source/dest cases we need to worry about at
61 COPY_FILE, /* to/from file (or a piped program) */
62 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
63 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
67 * Represents the end-of-line terminator type of the input
78 * This struct contains all the state variables used throughout a COPY
79 * operation. For simplicity, we use the same struct for all variants of COPY,
80 * even though some fields are used in only some cases.
82 * Multi-byte encodings: all supported client-side encodings encode multi-byte
83 * characters by having the first byte's high bit set. Subsequent bytes of the
84 * character can have the high bit not set. When scanning data in such an
85 * encoding to look for a match to a single-byte (ie ASCII) character, we must
86 * use the full pg_encoding_mblen() machinery to skip over multibyte
87 * characters, else we might find a false match to a trailing byte. In
88 * supported server encodings, there is no possibility of a false match, and
89 * it's faster to make useless comparisons to trailing bytes than it is to
90 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
91 * when we have to do it the hard way.
93 typedef struct CopyStateData
95 /* low-level state data */
96 CopyDest copy_dest; /* type of copy source/destination */
97 FILE *copy_file; /* used if copy_dest == COPY_FILE */
98 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
99 * dest == COPY_NEW_FE in COPY FROM */
100 bool fe_eof; /* true if detected end of copy data */
101 EolType eol_type; /* EOL type of input */
102 int file_encoding; /* file or remote side's character encoding */
103 bool need_transcoding; /* file encoding diff from server? */
104 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
106 /* parameters from the COPY command */
107 Relation rel; /* relation to copy to or from */
108 QueryDesc *queryDesc; /* executable query to copy from */
109 List *attnumlist; /* integer list of attnums to copy */
110 char *filename; /* filename, or NULL for STDIN/STDOUT */
111 bool is_program; /* is 'filename' a program to popen? */
112 bool binary; /* binary format? */
113 bool oids; /* include OIDs? */
114 bool freeze; /* freeze rows on loading? */
115 bool csv_mode; /* Comma Separated Value format? */
116 bool header_line; /* CSV header line? */
117 char *null_print; /* NULL marker string (server encoding!) */
118 int null_print_len; /* length of same */
119 char *null_print_client; /* same converted to file encoding */
120 char *delim; /* column delimiter (must be 1 byte) */
121 char *quote; /* CSV quote char (must be 1 byte) */
122 char *escape; /* CSV escape char (must be 1 byte) */
123 List *force_quote; /* list of column names */
124 bool force_quote_all; /* FORCE_QUOTE *? */
125 bool *force_quote_flags; /* per-column CSV FQ flags */
126 List *force_notnull; /* list of column names */
127 bool *force_notnull_flags; /* per-column CSV FNN flags */
128 List *force_null; /* list of column names */
129 bool *force_null_flags; /* per-column CSV FN flags */
130 bool convert_selectively; /* do selective binary conversion? */
131 List *convert_select; /* list of column names (can be NIL) */
132 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
134 /* these are just for error messages, see CopyFromErrorCallback */
135 const char *cur_relname; /* table name for error messages */
136 int cur_lineno; /* line number for error messages */
137 const char *cur_attname; /* current att for error messages */
138 const char *cur_attval; /* current att value for error messages */
141 * Working state for COPY TO/FROM
143 MemoryContext copycontext; /* per-copy execution context */
146 * Working state for COPY TO
148 FmgrInfo *out_functions; /* lookup info for output functions */
149 MemoryContext rowcontext; /* per-row evaluation context */
152 * Working state for COPY FROM
154 AttrNumber num_defaults;
156 FmgrInfo oid_in_function;
158 FmgrInfo *in_functions; /* array of input functions for each attrs */
159 Oid *typioparams; /* array of element types for in_functions */
160 int *defmap; /* array of default att numbers */
161 ExprState **defexprs; /* array of default att expressions */
162 bool volatile_defexprs; /* is any of defexprs volatile? */
166 * These variables are used to reduce overhead in textual COPY FROM.
168 * attribute_buf holds the separated, de-escaped text for each field of
169 * the current line. The CopyReadAttributes functions return arrays of
170 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
171 * the buffer on each cycle.
173 StringInfoData attribute_buf;
175 /* field raw data pointers found by COPY FROM */
181 * Similarly, line_buf holds the whole input line being processed. The
182 * input cycle is first to read the whole line into line_buf, convert it
183 * to server encoding there, and then extract the individual attribute
184 * fields into attribute_buf. line_buf is preserved unmodified so that we
185 * can display it in error messages if appropriate.
187 StringInfoData line_buf;
188 bool line_buf_converted; /* converted to server encoding? */
189 bool line_buf_valid; /* contains the row being processed? */
192 * Finally, raw_buf holds raw data read from the data source (file or
193 * client connection). CopyReadLine parses this data sufficiently to
194 * locate line boundaries, then transfers the data to line_buf and
195 * converts it. Note: we guarantee that there is a \0 at
196 * raw_buf[raw_buf_len].
198 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
200 int raw_buf_index; /* next byte to process */
201 int raw_buf_len; /* total # of bytes stored */
204 /* DestReceiver for COPY (query) TO */
207 DestReceiver pub; /* publicly-known function pointers */
208 CopyState cstate; /* CopyStateData for the command */
209 uint64 processed; /* # of tuples processed */
214 * These macros centralize code used to process line_buf and raw_buf buffers.
215 * They are macros because they often do continue/break control and to avoid
216 * function call overhead in tight COPY loops.
218 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
219 * prevent the continue/break processing from working. We end the "if (1)"
220 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
221 * any "else" in the calling code, and to avoid any compiler warnings about
222 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
226 * This keeps the character read at the top of the loop in the buffer
227 * even if there is more than one read-ahead.
229 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
232 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
234 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
240 /* This consumes the remainder of the buffer and breaks */
241 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
244 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
247 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
248 /* backslash just before EOF, treat as data char */ \
255 * Transfer any approved data to line_buf; must do this to be sure
256 * there is some room in raw_buf.
258 #define REFILL_LINEBUF \
261 if (raw_buf_ptr > cstate->raw_buf_index) \
263 appendBinaryStringInfo(&cstate->line_buf, \
264 cstate->raw_buf + cstate->raw_buf_index, \
265 raw_buf_ptr - cstate->raw_buf_index); \
266 cstate->raw_buf_index = raw_buf_ptr; \
270 /* Undo any read-ahead and jump out of the block. */
271 #define NO_END_OF_COPY_GOTO \
274 raw_buf_ptr = prev_raw_ptr + 1; \
275 goto not_end_of_copy; \
278 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
281 /* non-export function prototypes */
282 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, Node *raw_query,
283 const Oid queryRelId, List *attnamelist,
285 static void EndCopy(CopyState cstate);
286 static void ClosePipeToProgram(CopyState cstate);
287 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, Node *query,
288 const Oid queryRelId, const char *filename, bool is_program,
289 List *attnamelist, List *options);
290 static void EndCopyTo(CopyState cstate);
291 static uint64 DoCopyTo(CopyState cstate);
292 static uint64 CopyTo(CopyState cstate);
293 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
294 Datum *values, bool *nulls);
295 static uint64 CopyFrom(CopyState cstate);
296 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
297 CommandId mycid, int hi_options,
298 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
299 BulkInsertState bistate,
300 int nBufferedTuples, HeapTuple *bufferedTuples,
301 int firstBufferedLineNo);
302 static bool CopyReadLine(CopyState cstate);
303 static bool CopyReadLineText(CopyState cstate);
304 static int CopyReadAttributesText(CopyState cstate);
305 static int CopyReadAttributesCSV(CopyState cstate);
306 static Datum CopyReadBinaryAttribute(CopyState cstate,
307 int column_no, FmgrInfo *flinfo,
308 Oid typioparam, int32 typmod,
310 static void CopyAttributeOutText(CopyState cstate, char *string);
311 static void CopyAttributeOutCSV(CopyState cstate, char *string,
312 bool use_quote, bool single_attr);
313 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
315 static char *limit_printout_length(const char *str);
317 /* Low-level communications functions */
318 static void SendCopyBegin(CopyState cstate);
319 static void ReceiveCopyBegin(CopyState cstate);
320 static void SendCopyEnd(CopyState cstate);
321 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
322 static void CopySendString(CopyState cstate, const char *str);
323 static void CopySendChar(CopyState cstate, char c);
324 static void CopySendEndOfRow(CopyState cstate);
325 static int CopyGetData(CopyState cstate, void *databuf,
326 int minread, int maxread);
327 static void CopySendInt32(CopyState cstate, int32 val);
328 static bool CopyGetInt32(CopyState cstate, int32 *val);
329 static void CopySendInt16(CopyState cstate, int16 val);
330 static bool CopyGetInt16(CopyState cstate, int16 *val);
334 * Send copy start/stop messages for frontend copies. These have changed
335 * in past protocol redesigns.
338 SendCopyBegin(CopyState cstate)
340 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
344 int natts = list_length(cstate->attnumlist);
345 int16 format = (cstate->binary ? 1 : 0);
348 pq_beginmessage(&buf, 'H');
349 pq_sendbyte(&buf, format); /* overall format */
350 pq_sendint(&buf, natts, 2);
351 for (i = 0; i < natts; i++)
352 pq_sendint(&buf, format, 2); /* per-column formats */
354 cstate->copy_dest = COPY_NEW_FE;
356 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
361 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
362 errmsg("COPY BINARY is not supported to stdout or from stdin")));
363 pq_putemptymessage('H');
364 /* grottiness needed for old COPY OUT protocol */
366 cstate->copy_dest = COPY_OLD_FE;
373 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374 errmsg("COPY BINARY is not supported to stdout or from stdin")));
375 pq_putemptymessage('B');
376 /* grottiness needed for old COPY OUT protocol */
378 cstate->copy_dest = COPY_OLD_FE;
383 ReceiveCopyBegin(CopyState cstate)
385 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
389 int natts = list_length(cstate->attnumlist);
390 int16 format = (cstate->binary ? 1 : 0);
393 pq_beginmessage(&buf, 'G');
394 pq_sendbyte(&buf, format); /* overall format */
395 pq_sendint(&buf, natts, 2);
396 for (i = 0; i < natts; i++)
397 pq_sendint(&buf, format, 2); /* per-column formats */
399 cstate->copy_dest = COPY_NEW_FE;
400 cstate->fe_msgbuf = makeStringInfo();
402 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
407 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 errmsg("COPY BINARY is not supported to stdout or from stdin")));
409 pq_putemptymessage('G');
410 /* any error in old protocol will make us lose sync */
412 cstate->copy_dest = COPY_OLD_FE;
419 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 errmsg("COPY BINARY is not supported to stdout or from stdin")));
421 pq_putemptymessage('D');
422 /* any error in old protocol will make us lose sync */
424 cstate->copy_dest = COPY_OLD_FE;
426 /* We *must* flush here to ensure FE knows it can send. */
431 SendCopyEnd(CopyState cstate)
433 if (cstate->copy_dest == COPY_NEW_FE)
435 /* Shouldn't have any unsent data */
436 Assert(cstate->fe_msgbuf->len == 0);
437 /* Send Copy Done message */
438 pq_putemptymessage('c');
442 CopySendData(cstate, "\\.", 2);
443 /* Need to flush out the trailer (this also appends a newline) */
444 CopySendEndOfRow(cstate);
445 pq_endcopyout(false);
450 * CopySendData sends output data to the destination (file or frontend)
451 * CopySendString does the same for null-terminated strings
452 * CopySendChar does the same for single characters
453 * CopySendEndOfRow does the appropriate thing at end of each data row
454 * (data is not actually flushed except by CopySendEndOfRow)
456 * NB: no data conversion is applied by these functions
460 CopySendData(CopyState cstate, const void *databuf, int datasize)
462 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
466 CopySendString(CopyState cstate, const char *str)
468 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
472 CopySendChar(CopyState cstate, char c)
474 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
478 CopySendEndOfRow(CopyState cstate)
480 StringInfo fe_msgbuf = cstate->fe_msgbuf;
482 switch (cstate->copy_dest)
487 /* Default line termination depends on platform */
489 CopySendChar(cstate, '\n');
491 CopySendString(cstate, "\r\n");
495 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
496 cstate->copy_file) != 1 ||
497 ferror(cstate->copy_file))
499 if (cstate->is_program)
504 * The pipe will be closed automatically on error at
505 * the end of transaction, but we might get a better
506 * error message from the subprocess' exit code than
509 ClosePipeToProgram(cstate);
512 * If ClosePipeToProgram() didn't throw an error, the
513 * program terminated normally, but closed the pipe
514 * first. Restore errno, and throw an error.
519 (errcode_for_file_access(),
520 errmsg("could not write to COPY program: %m")));
524 (errcode_for_file_access(),
525 errmsg("could not write to COPY file: %m")));
529 /* The FE/BE protocol uses \n as newline for all platforms */
531 CopySendChar(cstate, '\n');
533 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
535 /* no hope of recovering connection sync, so FATAL */
537 (errcode(ERRCODE_CONNECTION_FAILURE),
538 errmsg("connection lost during COPY to stdout")));
542 /* The FE/BE protocol uses \n as newline for all platforms */
544 CopySendChar(cstate, '\n');
546 /* Dump the accumulated row as one CopyData message */
547 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
551 resetStringInfo(fe_msgbuf);
555 * CopyGetData reads data from the source (file or frontend)
557 * We attempt to read at least minread, and at most maxread, bytes from
558 * the source. The actual number of bytes read is returned; if this is
559 * less than minread, EOF was detected.
561 * Note: when copying from the frontend, we expect a proper EOF mark per
562 * protocol; if the frontend simply drops the connection, we raise error.
563 * It seems unwise to allow the COPY IN to complete normally in that case.
565 * NB: no data conversion is applied here.
568 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
572 switch (cstate->copy_dest)
575 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
576 if (ferror(cstate->copy_file))
578 (errcode_for_file_access(),
579 errmsg("could not read from COPY file: %m")));
584 * We cannot read more than minread bytes (which in practice is 1)
585 * because old protocol doesn't have any clear way of separating
586 * the COPY stream from following data. This is slow, but not any
587 * slower than the code path was originally, and we don't care
588 * much anymore about the performance of old protocol.
590 if (pq_getbytes((char *) databuf, minread))
592 /* Only a \. terminator is legal EOF in old protocol */
594 (errcode(ERRCODE_CONNECTION_FAILURE),
595 errmsg("unexpected EOF on client connection with an open transaction")));
600 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
604 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
606 /* Try to receive another message */
610 HOLD_CANCEL_INTERRUPTS();
612 mtype = pq_getbyte();
615 (errcode(ERRCODE_CONNECTION_FAILURE),
616 errmsg("unexpected EOF on client connection with an open transaction")));
617 if (pq_getmessage(cstate->fe_msgbuf, 0))
619 (errcode(ERRCODE_CONNECTION_FAILURE),
620 errmsg("unexpected EOF on client connection with an open transaction")));
621 RESUME_CANCEL_INTERRUPTS();
624 case 'd': /* CopyData */
626 case 'c': /* CopyDone */
627 /* COPY IN correctly terminated by frontend */
628 cstate->fe_eof = true;
630 case 'f': /* CopyFail */
632 (errcode(ERRCODE_QUERY_CANCELED),
633 errmsg("COPY from stdin failed: %s",
634 pq_getmsgstring(cstate->fe_msgbuf))));
636 case 'H': /* Flush */
640 * Ignore Flush/Sync for the convenience of client
641 * libraries (such as libpq) that may send those
642 * without noticing that the command they just
648 (errcode(ERRCODE_PROTOCOL_VIOLATION),
649 errmsg("unexpected message type 0x%02X during COPY from stdin",
654 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
657 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
658 databuf = (void *) ((char *) databuf + avail);
670 * These functions do apply some data conversion
674 * CopySendInt32 sends an int32 in network byte order
677 CopySendInt32(CopyState cstate, int32 val)
681 buf = htonl((uint32) val);
682 CopySendData(cstate, &buf, sizeof(buf));
686 * CopyGetInt32 reads an int32 that appears in network byte order
688 * Returns true if OK, false if EOF
691 CopyGetInt32(CopyState cstate, int32 *val)
695 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
697 *val = 0; /* suppress compiler warning */
700 *val = (int32) ntohl(buf);
705 * CopySendInt16 sends an int16 in network byte order
708 CopySendInt16(CopyState cstate, int16 val)
712 buf = htons((uint16) val);
713 CopySendData(cstate, &buf, sizeof(buf));
717 * CopyGetInt16 reads an int16 that appears in network byte order
720 CopyGetInt16(CopyState cstate, int16 *val)
724 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
726 *val = 0; /* suppress compiler warning */
729 *val = (int16) ntohs(buf);
735 * CopyLoadRawBuf loads some more data into raw_buf
737 * Returns TRUE if able to obtain at least one more byte, else FALSE.
739 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
740 * down to the start of the buffer and then we load more data after that.
741 * This case is used only when a frontend multibyte character crosses a
742 * bufferload boundary.
745 CopyLoadRawBuf(CopyState cstate)
750 if (cstate->raw_buf_index < cstate->raw_buf_len)
752 /* Copy down the unprocessed data */
753 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
754 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
758 nbytes = 0; /* no data need be saved */
760 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
761 1, RAW_BUF_SIZE - nbytes);
763 cstate->raw_buf[nbytes] = '\0';
764 cstate->raw_buf_index = 0;
765 cstate->raw_buf_len = nbytes;
766 return (inbytes > 0);
771 * DoCopy executes the SQL COPY statement
773 * Either unload or reload contents of table <relation>, depending on <from>.
774 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
775 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
778 * If <pipe> is false, transfer is between the table and the file named
779 * <filename>. Otherwise, transfer is between the table and our regular
780 * input/output stream. The latter could be either stdin/stdout or a
781 * socket, depending on whether we're running under Postmaster control.
783 * Do not allow a Postgres user without superuser privilege to read from
784 * or write to a file.
786 * Do not allow the copy if user doesn't have proper permission to access
787 * the table or the specifically requested columns.
790 DoCopy(ParseState *pstate, const CopyStmt *stmt, uint64 *processed)
793 bool is_from = stmt->is_from;
794 bool pipe = (stmt->filename == NULL);
798 List *range_table = NIL;
800 /* Disallow COPY to/from file or program except to superusers. */
801 if (!pipe && !superuser())
803 if (stmt->is_program)
805 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
806 errmsg("must be superuser to COPY to or from an external program"),
807 errhint("Anyone can COPY to stdout or from stdin. "
808 "psql's \\copy command also works for anyone.")));
811 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
812 errmsg("must be superuser to COPY to or from a file"),
813 errhint("Anyone can COPY to stdout or from stdin. "
814 "psql's \\copy command also works for anyone.")));
820 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
825 Assert(!stmt->query);
827 /* Open and lock the relation, using the appropriate lock type. */
828 rel = heap_openrv(stmt->relation,
829 (is_from ? RowExclusiveLock : AccessShareLock));
831 relid = RelationGetRelid(rel);
833 rte = makeNode(RangeTblEntry);
834 rte->rtekind = RTE_RELATION;
835 rte->relid = RelationGetRelid(rel);
836 rte->relkind = rel->rd_rel->relkind;
837 rte->requiredPerms = required_access;
838 range_table = list_make1(rte);
840 tupDesc = RelationGetDescr(rel);
841 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
842 foreach(cur, attnums)
844 int attno = lfirst_int(cur) -
845 FirstLowInvalidHeapAttributeNumber;
848 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
850 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
852 ExecCheckRTPerms(range_table, true);
855 * Permission check for row security policies.
857 * check_enable_rls will ereport(ERROR) if the user has requested
858 * something invalid and will otherwise indicate if we should enable
859 * RLS (returns RLS_ENABLED) or not for this COPY statement.
861 * If the relation has a row security policy and we are to apply it
862 * then perform a "query" copy and allow the normal query processing
863 * to handle the policies.
865 * If RLS is not enabled for this, then just fall through to the
866 * normal non-filtering relation handling.
868 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
877 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
878 errmsg("COPY FROM not supported with row-level security"),
879 errhint("Use INSERT statements instead.")));
881 /* Build target list */
882 cr = makeNode(ColumnRef);
885 cr->fields = list_make1(makeNode(A_Star));
887 cr->fields = stmt->attlist;
891 target = makeNode(ResTarget);
893 target->indirection = NIL;
894 target->val = (Node *) cr;
895 target->location = 1;
898 * Build RangeVar for from clause, fully qualified based on the
899 * relation which we have opened and locked.
901 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
902 RelationGetRelationName(rel), -1);
905 select = makeNode(SelectStmt);
906 select->targetList = list_make1(target);
907 select->fromClause = list_make1(from);
909 query = (Node *) select;
912 * Close the relation for now, but keep the lock on it to prevent
913 * changes between now and when we start the query-based COPY.
915 * We'll reopen it later as part of the query-based COPY.
917 heap_close(rel, NoLock);
934 /* check read-only transaction and parallel mode */
935 if (XactReadOnly && !rel->rd_islocaltemp)
936 PreventCommandIfReadOnly("COPY FROM");
937 PreventCommandIfParallelMode("COPY FROM");
939 cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
940 stmt->attlist, stmt->options);
941 cstate->range_table = range_table;
942 *processed = CopyFrom(cstate); /* copy from file to database */
947 cstate = BeginCopyTo(pstate, rel, query, relid,
948 stmt->filename, stmt->is_program,
949 stmt->attlist, stmt->options);
950 *processed = DoCopyTo(cstate); /* copy from database to file */
955 * Close the relation. If reading, we can release the AccessShareLock we
956 * got; if writing, we should hold the lock until end of transaction to
957 * ensure that updates will be committed before lock is released.
960 heap_close(rel, (is_from ? NoLock : AccessShareLock));
966 * Process the statement option list for COPY.
968 * Scan the options list (a list of DefElem) and transpose the information
969 * into cstate, applying appropriate error checking.
971 * cstate is assumed to be filled with zeroes initially.
973 * This is exported so that external users of the COPY API can sanity-check
974 * a list of options. In that usage, cstate should be passed as NULL
975 * (since external users don't know sizeof(CopyStateData)) and the collected
976 * data is just leaked until CurrentMemoryContext is reset.
978 * Note that additional checking, such as whether column names listed in FORCE
979 * QUOTE actually exist, has to be applied later. This just checks for
980 * self-consistency of the options list.
983 ProcessCopyOptions(ParseState *pstate,
988 bool format_specified = false;
991 /* Support external use for option sanity checking */
993 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
995 cstate->file_encoding = -1;
997 /* Extract options from the statement node tree */
998 foreach(option, options)
1000 DefElem *defel = (DefElem *) lfirst(option);
1002 if (strcmp(defel->defname, "format") == 0)
1004 char *fmt = defGetString(defel);
1006 if (format_specified)
1008 (errcode(ERRCODE_SYNTAX_ERROR),
1009 errmsg("conflicting or redundant options"),
1010 parser_errposition(pstate, defel->location)));
1011 format_specified = true;
1012 if (strcmp(fmt, "text") == 0)
1013 /* default format */ ;
1014 else if (strcmp(fmt, "csv") == 0)
1015 cstate->csv_mode = true;
1016 else if (strcmp(fmt, "binary") == 0)
1017 cstate->binary = true;
1020 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1021 errmsg("COPY format \"%s\" not recognized", fmt),
1022 parser_errposition(pstate, defel->location)));
1024 else if (strcmp(defel->defname, "oids") == 0)
1028 (errcode(ERRCODE_SYNTAX_ERROR),
1029 errmsg("conflicting or redundant options"),
1030 parser_errposition(pstate, defel->location)));
1031 cstate->oids = defGetBoolean(defel);
1033 else if (strcmp(defel->defname, "freeze") == 0)
1037 (errcode(ERRCODE_SYNTAX_ERROR),
1038 errmsg("conflicting or redundant options"),
1039 parser_errposition(pstate, defel->location)));
1040 cstate->freeze = defGetBoolean(defel);
1042 else if (strcmp(defel->defname, "delimiter") == 0)
1046 (errcode(ERRCODE_SYNTAX_ERROR),
1047 errmsg("conflicting or redundant options"),
1048 parser_errposition(pstate, defel->location)));
1049 cstate->delim = defGetString(defel);
1051 else if (strcmp(defel->defname, "null") == 0)
1053 if (cstate->null_print)
1055 (errcode(ERRCODE_SYNTAX_ERROR),
1056 errmsg("conflicting or redundant options"),
1057 parser_errposition(pstate, defel->location)));
1058 cstate->null_print = defGetString(defel);
1060 else if (strcmp(defel->defname, "header") == 0)
1062 if (cstate->header_line)
1064 (errcode(ERRCODE_SYNTAX_ERROR),
1065 errmsg("conflicting or redundant options"),
1066 parser_errposition(pstate, defel->location)));
1067 cstate->header_line = defGetBoolean(defel);
1069 else if (strcmp(defel->defname, "quote") == 0)
1073 (errcode(ERRCODE_SYNTAX_ERROR),
1074 errmsg("conflicting or redundant options"),
1075 parser_errposition(pstate, defel->location)));
1076 cstate->quote = defGetString(defel);
1078 else if (strcmp(defel->defname, "escape") == 0)
1082 (errcode(ERRCODE_SYNTAX_ERROR),
1083 errmsg("conflicting or redundant options"),
1084 parser_errposition(pstate, defel->location)));
1085 cstate->escape = defGetString(defel);
1087 else if (strcmp(defel->defname, "force_quote") == 0)
1089 if (cstate->force_quote || cstate->force_quote_all)
1091 (errcode(ERRCODE_SYNTAX_ERROR),
1092 errmsg("conflicting or redundant options"),
1093 parser_errposition(pstate, defel->location)));
1094 if (defel->arg && IsA(defel->arg, A_Star))
1095 cstate->force_quote_all = true;
1096 else if (defel->arg && IsA(defel->arg, List))
1097 cstate->force_quote = (List *) defel->arg;
1100 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1101 errmsg("argument to option \"%s\" must be a list of column names",
1103 parser_errposition(pstate, defel->location)));
1105 else if (strcmp(defel->defname, "force_not_null") == 0)
1107 if (cstate->force_notnull)
1109 (errcode(ERRCODE_SYNTAX_ERROR),
1110 errmsg("conflicting or redundant options"),
1111 parser_errposition(pstate, defel->location)));
1112 if (defel->arg && IsA(defel->arg, List))
1113 cstate->force_notnull = (List *) defel->arg;
1116 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1117 errmsg("argument to option \"%s\" must be a list of column names",
1119 parser_errposition(pstate, defel->location)));
1121 else if (strcmp(defel->defname, "force_null") == 0)
1123 if (cstate->force_null)
1125 (errcode(ERRCODE_SYNTAX_ERROR),
1126 errmsg("conflicting or redundant options")));
1127 if (defel->arg && IsA(defel->arg, List))
1128 cstate->force_null = (List *) defel->arg;
1131 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1132 errmsg("argument to option \"%s\" must be a list of column names",
1134 parser_errposition(pstate, defel->location)));
1136 else if (strcmp(defel->defname, "convert_selectively") == 0)
1139 * Undocumented, not-accessible-from-SQL option: convert only the
1140 * named columns to binary form, storing the rest as NULLs. It's
1141 * allowed for the column list to be NIL.
1143 if (cstate->convert_selectively)
1145 (errcode(ERRCODE_SYNTAX_ERROR),
1146 errmsg("conflicting or redundant options"),
1147 parser_errposition(pstate, defel->location)));
1148 cstate->convert_selectively = true;
1149 if (defel->arg == NULL || IsA(defel->arg, List))
1150 cstate->convert_select = (List *) defel->arg;
1153 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1154 errmsg("argument to option \"%s\" must be a list of column names",
1156 parser_errposition(pstate, defel->location)));
1158 else if (strcmp(defel->defname, "encoding") == 0)
1160 if (cstate->file_encoding >= 0)
1162 (errcode(ERRCODE_SYNTAX_ERROR),
1163 errmsg("conflicting or redundant options"),
1164 parser_errposition(pstate, defel->location)));
1165 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1166 if (cstate->file_encoding < 0)
1168 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1169 errmsg("argument to option \"%s\" must be a valid encoding name",
1171 parser_errposition(pstate, defel->location)));
1175 (errcode(ERRCODE_SYNTAX_ERROR),
1176 errmsg("option \"%s\" not recognized",
1178 parser_errposition(pstate, defel->location)));
1182 * Check for incompatible options (must do these two before inserting
1185 if (cstate->binary && cstate->delim)
1187 (errcode(ERRCODE_SYNTAX_ERROR),
1188 errmsg("cannot specify DELIMITER in BINARY mode")));
1190 if (cstate->binary && cstate->null_print)
1192 (errcode(ERRCODE_SYNTAX_ERROR),
1193 errmsg("cannot specify NULL in BINARY mode")));
1195 /* Set defaults for omitted options */
1197 cstate->delim = cstate->csv_mode ? "," : "\t";
1199 if (!cstate->null_print)
1200 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1201 cstate->null_print_len = strlen(cstate->null_print);
1203 if (cstate->csv_mode)
1206 cstate->quote = "\"";
1207 if (!cstate->escape)
1208 cstate->escape = cstate->quote;
1211 /* Only single-byte delimiter strings are supported. */
1212 if (strlen(cstate->delim) != 1)
1214 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1215 errmsg("COPY delimiter must be a single one-byte character")));
1217 /* Disallow end-of-line characters */
1218 if (strchr(cstate->delim, '\r') != NULL ||
1219 strchr(cstate->delim, '\n') != NULL)
1221 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1222 errmsg("COPY delimiter cannot be newline or carriage return")));
1224 if (strchr(cstate->null_print, '\r') != NULL ||
1225 strchr(cstate->null_print, '\n') != NULL)
1227 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1228 errmsg("COPY null representation cannot use newline or carriage return")));
1231 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1232 * backslash because it would be ambiguous. We can't allow the other
1233 * cases because data characters matching the delimiter must be
1234 * backslashed, and certain backslash combinations are interpreted
1235 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1236 * more than strictly necessary, but seems best for consistency and
1237 * future-proofing. Likewise we disallow all digits though only octal
1238 * digits are actually dangerous.
1240 if (!cstate->csv_mode &&
1241 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1242 cstate->delim[0]) != NULL)
1244 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1245 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1248 if (!cstate->csv_mode && cstate->header_line)
1250 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1251 errmsg("COPY HEADER available only in CSV mode")));
1254 if (!cstate->csv_mode && cstate->quote != NULL)
1256 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1257 errmsg("COPY quote available only in CSV mode")));
1259 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1261 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1262 errmsg("COPY quote must be a single one-byte character")));
1264 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1266 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1267 errmsg("COPY delimiter and quote must be different")));
1270 if (!cstate->csv_mode && cstate->escape != NULL)
1272 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1273 errmsg("COPY escape available only in CSV mode")));
1275 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1277 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1278 errmsg("COPY escape must be a single one-byte character")));
1280 /* Check force_quote */
1281 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1283 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1284 errmsg("COPY force quote available only in CSV mode")));
1285 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1287 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1288 errmsg("COPY force quote only available using COPY TO")));
1290 /* Check force_notnull */
1291 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1293 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1294 errmsg("COPY force not null available only in CSV mode")));
1295 if (cstate->force_notnull != NIL && !is_from)
1297 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1298 errmsg("COPY force not null only available using COPY FROM")));
1300 /* Check force_null */
1301 if (!cstate->csv_mode && cstate->force_null != NIL)
1303 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1304 errmsg("COPY force null available only in CSV mode")));
1306 if (cstate->force_null != NIL && !is_from)
1308 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1309 errmsg("COPY force null only available using COPY FROM")));
1311 /* Don't allow the delimiter to appear in the null string. */
1312 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1314 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1315 errmsg("COPY delimiter must not appear in the NULL specification")));
1317 /* Don't allow the CSV quote char to appear in the null string. */
1318 if (cstate->csv_mode &&
1319 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1321 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1322 errmsg("CSV quote character must not appear in the NULL specification")));
1326 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1328 * Iff <binary>, unload or reload in the binary format, as opposed to the
1329 * more wasteful but more robust and portable text format.
1331 * Iff <oids>, unload or reload the format that includes OID information.
1332 * On input, we accept OIDs whether or not the table has an OID column,
1333 * but silently drop them if it does not. On output, we report an error
1334 * if the user asks for OIDs in a table that has none (not providing an
1335 * OID column might seem friendlier, but could seriously confuse programs).
1337 * If in the text format, delimit columns with delimiter <delim> and print
1338 * NULL values as <null_print>.
1341 BeginCopy(ParseState *pstate,
1345 const Oid queryRelId,
1352 MemoryContext oldcontext;
1354 /* Allocate workspace and zero all fields */
1355 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1358 * We allocate everything used by a cstate in a new memory context. This
1359 * avoids memory leaks during repeated use of COPY in a query.
1361 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1363 ALLOCSET_DEFAULT_SIZES);
1365 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1367 /* Extract options from the statement node tree */
1368 ProcessCopyOptions(pstate, cstate, is_from, options);
1370 /* Process the source/target relation or query */
1377 tupDesc = RelationGetDescr(cstate->rel);
1379 /* Don't allow COPY w/ OIDs to or from a table without them */
1380 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1382 (errcode(ERRCODE_UNDEFINED_COLUMN),
1383 errmsg("table \"%s\" does not have OIDs",
1384 RelationGetRelationName(cstate->rel))));
1396 /* Don't allow COPY w/ OIDs from a query */
1399 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1400 errmsg("COPY (query) WITH OIDS is not supported")));
1403 * Run parse analysis and rewrite. Note this also acquires sufficient
1404 * locks on the source table(s).
1406 * Because the parser and planner tend to scribble on their input, we
1407 * make a preliminary copy of the source querytree. This prevents
1408 * problems in the case that the COPY is in a portal or plpgsql
1409 * function and is executed repeatedly. (See also the same hack in
1410 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1412 rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
1413 pstate->p_sourcetext, NULL, 0);
1415 /* check that we got back something we can work with */
1416 if (rewritten == NIL)
1419 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1420 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1422 else if (list_length(rewritten) > 1)
1426 /* examine queries to determine which error message to issue */
1427 foreach(lc, rewritten)
1429 Query *q = (Query *) lfirst(lc);
1431 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1433 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1434 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1435 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1437 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1438 errmsg("DO ALSO rules are not supported for the COPY")));
1442 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1443 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1446 query = (Query *) linitial(rewritten);
1448 /* The grammar allows SELECT INTO, but we don't support that */
1449 if (query->utilityStmt != NULL &&
1450 IsA(query->utilityStmt, CreateTableAsStmt))
1452 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1453 errmsg("COPY (SELECT INTO) is not supported")));
1455 Assert(query->utilityStmt == NULL);
1458 * Similarly the grammar doesn't enforce the presence of a RETURNING
1459 * clause, but this is required here.
1461 if (query->commandType != CMD_SELECT &&
1462 query->returningList == NIL)
1464 Assert(query->commandType == CMD_INSERT ||
1465 query->commandType == CMD_UPDATE ||
1466 query->commandType == CMD_DELETE);
1469 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1470 errmsg("COPY query must have a RETURNING clause")));
1473 /* plan the query */
1474 plan = pg_plan_query(query, 0, NULL);
1477 * With row level security and a user using "COPY relation TO", we
1478 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1479 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1480 * in any RLS clauses.
1482 * When this happens, we are passed in the relid of the originally
1483 * found relation (which we have locked). As the planner will look up
1484 * the relation again, we double-check here to make sure it found the
1485 * same one that we have locked.
1487 if (queryRelId != InvalidOid)
1490 * Note that with RLS involved there may be multiple relations,
1491 * and while the one we need is almost certainly first, we don't
1492 * make any guarantees of that in the planner, so check the whole
1493 * list and make sure we find the original relation.
1495 if (!list_member_oid(plan->relationOids, queryRelId))
1497 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1498 errmsg("relation referenced by COPY statement has changed")));
1502 * Use a snapshot with an updated command ID to ensure this query sees
1503 * results of any previously executed queries.
1505 PushCopiedSnapshot(GetActiveSnapshot());
1506 UpdateActiveSnapshotCommandId();
1508 /* Create dest receiver for COPY OUT */
1509 dest = CreateDestReceiver(DestCopyOut);
1510 ((DR_copy *) dest)->cstate = cstate;
1512 /* Create a QueryDesc requesting no output */
1513 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1514 GetActiveSnapshot(),
1519 * Call ExecutorStart to prepare the plan for execution.
1521 * ExecutorStart computes a result tupdesc for us
1523 ExecutorStart(cstate->queryDesc, 0);
1525 tupDesc = cstate->queryDesc->tupDesc;
1528 /* Generate or convert list of attributes to process */
1529 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1531 num_phys_attrs = tupDesc->natts;
1533 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1534 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1535 if (cstate->force_quote_all)
1539 for (i = 0; i < num_phys_attrs; i++)
1540 cstate->force_quote_flags[i] = true;
1542 else if (cstate->force_quote)
1547 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1549 foreach(cur, attnums)
1551 int attnum = lfirst_int(cur);
1553 if (!list_member_int(cstate->attnumlist, attnum))
1555 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1556 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1557 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1558 cstate->force_quote_flags[attnum - 1] = true;
1562 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1563 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1564 if (cstate->force_notnull)
1569 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1571 foreach(cur, attnums)
1573 int attnum = lfirst_int(cur);
1575 if (!list_member_int(cstate->attnumlist, attnum))
1577 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1578 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1579 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1580 cstate->force_notnull_flags[attnum - 1] = true;
1584 /* Convert FORCE_NULL name list to per-column flags, check validity */
1585 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1586 if (cstate->force_null)
1591 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1593 foreach(cur, attnums)
1595 int attnum = lfirst_int(cur);
1597 if (!list_member_int(cstate->attnumlist, attnum))
1599 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1600 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1601 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1602 cstate->force_null_flags[attnum - 1] = true;
1606 /* Convert convert_selectively name list to per-column flags */
1607 if (cstate->convert_selectively)
1612 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1614 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1616 foreach(cur, attnums)
1618 int attnum = lfirst_int(cur);
1620 if (!list_member_int(cstate->attnumlist, attnum))
1622 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1623 errmsg_internal("selected column \"%s\" not referenced by COPY",
1624 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1625 cstate->convert_select_flags[attnum - 1] = true;
1629 /* Use client encoding when ENCODING option is not specified. */
1630 if (cstate->file_encoding < 0)
1631 cstate->file_encoding = pg_get_client_encoding();
1634 * Set up encoding conversion info. Even if the file and server encodings
1635 * are the same, we must apply pg_any_to_server() to validate data in
1636 * multibyte encodings.
1638 cstate->need_transcoding =
1639 (cstate->file_encoding != GetDatabaseEncoding() ||
1640 pg_database_encoding_max_length() > 1);
1641 /* See Multibyte encoding comment above */
1642 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1644 cstate->copy_dest = COPY_FILE; /* default */
1646 MemoryContextSwitchTo(oldcontext);
1652 * Closes the pipe to an external program, checking the pclose() return code.
1655 ClosePipeToProgram(CopyState cstate)
1659 Assert(cstate->is_program);
1661 pclose_rc = ClosePipeStream(cstate->copy_file);
1662 if (pclose_rc == -1)
1664 (errcode_for_file_access(),
1665 errmsg("could not close pipe to external command: %m")));
1666 else if (pclose_rc != 0)
1668 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1669 errmsg("program \"%s\" failed",
1671 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1675 * Release resources allocated in a cstate for COPY TO/FROM.
1678 EndCopy(CopyState cstate)
1680 if (cstate->is_program)
1682 ClosePipeToProgram(cstate);
1686 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1688 (errcode_for_file_access(),
1689 errmsg("could not close file \"%s\": %m",
1690 cstate->filename)));
1693 MemoryContextDelete(cstate->copycontext);
1698 * Setup CopyState to read tuples from a table or a query for COPY TO.
1701 BeginCopyTo(ParseState *pstate,
1704 const Oid queryRelId,
1705 const char *filename,
1711 bool pipe = (filename == NULL);
1712 MemoryContext oldcontext;
1714 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1716 if (rel->rd_rel->relkind == RELKIND_VIEW)
1718 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1719 errmsg("cannot copy from view \"%s\"",
1720 RelationGetRelationName(rel)),
1721 errhint("Try the COPY (SELECT ...) TO variant.")));
1722 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1724 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1725 errmsg("cannot copy from materialized view \"%s\"",
1726 RelationGetRelationName(rel)),
1727 errhint("Try the COPY (SELECT ...) TO variant.")));
1728 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1730 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1731 errmsg("cannot copy from foreign table \"%s\"",
1732 RelationGetRelationName(rel)),
1733 errhint("Try the COPY (SELECT ...) TO variant.")));
1734 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1736 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1737 errmsg("cannot copy from sequence \"%s\"",
1738 RelationGetRelationName(rel))));
1741 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1742 errmsg("cannot copy from non-table relation \"%s\"",
1743 RelationGetRelationName(rel))));
1746 cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1748 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1752 Assert(!is_program); /* the grammar does not allow this */
1753 if (whereToSendOutput != DestRemote)
1754 cstate->copy_file = stdout;
1758 cstate->filename = pstrdup(filename);
1759 cstate->is_program = is_program;
1763 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1764 if (cstate->copy_file == NULL)
1766 (errcode_for_file_access(),
1767 errmsg("could not execute command \"%s\": %m",
1768 cstate->filename)));
1772 mode_t oumask; /* Pre-existing umask value */
1776 * Prevent write to relative path ... too easy to shoot oneself in
1777 * the foot by overwriting a database file ...
1779 if (!is_absolute_path(filename))
1781 (errcode(ERRCODE_INVALID_NAME),
1782 errmsg("relative path not allowed for COPY to file")));
1784 oumask = umask(S_IWGRP | S_IWOTH);
1785 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1787 if (cstate->copy_file == NULL)
1789 (errcode_for_file_access(),
1790 errmsg("could not open file \"%s\" for writing: %m",
1791 cstate->filename)));
1793 if (fstat(fileno(cstate->copy_file), &st))
1795 (errcode_for_file_access(),
1796 errmsg("could not stat file \"%s\": %m",
1797 cstate->filename)));
1799 if (S_ISDIR(st.st_mode))
1801 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1802 errmsg("\"%s\" is a directory", cstate->filename)));
1806 MemoryContextSwitchTo(oldcontext);
1812 * This intermediate routine exists mainly to localize the effects of setjmp
1813 * so we don't need to plaster a lot of variables with "volatile".
1816 DoCopyTo(CopyState cstate)
1818 bool pipe = (cstate->filename == NULL);
1819 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1825 SendCopyBegin(cstate);
1827 processed = CopyTo(cstate);
1830 SendCopyEnd(cstate);
1835 * Make sure we turn off old-style COPY OUT mode upon error. It is
1836 * okay to do this in all cases, since it does nothing if the mode is
1839 pq_endcopyout(true);
1848 * Clean up storage and release resources for COPY TO.
1851 EndCopyTo(CopyState cstate)
1853 if (cstate->queryDesc != NULL)
1855 /* Close down the query and free resources. */
1856 ExecutorFinish(cstate->queryDesc);
1857 ExecutorEnd(cstate->queryDesc);
1858 FreeQueryDesc(cstate->queryDesc);
1859 PopActiveSnapshot();
1862 /* Clean up storage */
1867 * Copy from relation or query TO file.
1870 CopyTo(CopyState cstate)
1874 Form_pg_attribute *attr;
1879 tupDesc = RelationGetDescr(cstate->rel);
1881 tupDesc = cstate->queryDesc->tupDesc;
1882 attr = tupDesc->attrs;
1883 num_phys_attrs = tupDesc->natts;
1884 cstate->null_print_client = cstate->null_print; /* default */
1886 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1887 cstate->fe_msgbuf = makeStringInfo();
1889 /* Get info about the columns we need to process. */
1890 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1891 foreach(cur, cstate->attnumlist)
1893 int attnum = lfirst_int(cur);
1898 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1902 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1905 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1909 * Create a temporary memory context that we can reset once per row to
1910 * recover palloc'd memory. This avoids any problems with leaks inside
1911 * datatype output routines, and should be faster than retail pfree's
1912 * anyway. (We don't need a whole econtext as CopyFrom does.)
1914 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1916 ALLOCSET_DEFAULT_SIZES);
1920 /* Generate header for a binary copy */
1924 CopySendData(cstate, BinarySignature, 11);
1929 CopySendInt32(cstate, tmp);
1930 /* No header extension */
1932 CopySendInt32(cstate, tmp);
1937 * For non-binary copy, we need to convert null_print to file
1938 * encoding, because it will be sent directly with CopySendString.
1940 if (cstate->need_transcoding)
1941 cstate->null_print_client = pg_server_to_any(cstate->null_print,
1942 cstate->null_print_len,
1943 cstate->file_encoding);
1945 /* if a header has been requested send the line */
1946 if (cstate->header_line)
1948 bool hdr_delim = false;
1950 foreach(cur, cstate->attnumlist)
1952 int attnum = lfirst_int(cur);
1956 CopySendChar(cstate, cstate->delim[0]);
1959 colname = NameStr(attr[attnum - 1]->attname);
1961 CopyAttributeOutCSV(cstate, colname, false,
1962 list_length(cstate->attnumlist) == 1);
1965 CopySendEndOfRow(cstate);
1973 HeapScanDesc scandesc;
1976 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1977 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1979 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
1982 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
1984 CHECK_FOR_INTERRUPTS();
1986 /* Deconstruct the tuple ... faster than repeated heap_getattr */
1987 heap_deform_tuple(tuple, tupDesc, values, nulls);
1989 /* Format and send the data */
1990 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
1994 heap_endscan(scandesc);
2001 /* run the plan --- the dest receiver will send tuples */
2002 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
2003 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2008 /* Generate trailer for a binary copy */
2009 CopySendInt16(cstate, -1);
2010 /* Need to flush out the trailer */
2011 CopySendEndOfRow(cstate);
2014 MemoryContextDelete(cstate->rowcontext);
2020 * Emit one row during CopyTo().
2023 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2025 bool need_delim = false;
2026 FmgrInfo *out_functions = cstate->out_functions;
2027 MemoryContext oldcontext;
2031 MemoryContextReset(cstate->rowcontext);
2032 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2036 /* Binary per-tuple header */
2037 CopySendInt16(cstate, list_length(cstate->attnumlist));
2038 /* Send OID if wanted --- note attnumlist doesn't include it */
2041 /* Hack --- assume Oid is same size as int32 */
2042 CopySendInt32(cstate, sizeof(int32));
2043 CopySendInt32(cstate, tupleOid);
2048 /* Text format has no per-tuple header, but send OID if wanted */
2049 /* Assume digits don't need any quoting or encoding conversion */
2052 string = DatumGetCString(DirectFunctionCall1(oidout,
2053 ObjectIdGetDatum(tupleOid)));
2054 CopySendString(cstate, string);
2059 foreach(cur, cstate->attnumlist)
2061 int attnum = lfirst_int(cur);
2062 Datum value = values[attnum - 1];
2063 bool isnull = nulls[attnum - 1];
2065 if (!cstate->binary)
2068 CopySendChar(cstate, cstate->delim[0]);
2074 if (!cstate->binary)
2075 CopySendString(cstate, cstate->null_print_client);
2077 CopySendInt32(cstate, -1);
2081 if (!cstate->binary)
2083 string = OutputFunctionCall(&out_functions[attnum - 1],
2085 if (cstate->csv_mode)
2086 CopyAttributeOutCSV(cstate, string,
2087 cstate->force_quote_flags[attnum - 1],
2088 list_length(cstate->attnumlist) == 1);
2090 CopyAttributeOutText(cstate, string);
2096 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2098 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2099 CopySendData(cstate, VARDATA(outputbytes),
2100 VARSIZE(outputbytes) - VARHDRSZ);
2105 CopySendEndOfRow(cstate);
2107 MemoryContextSwitchTo(oldcontext);
2112 * error context callback for COPY FROM
2114 * The argument for the error context must be CopyState.
2117 CopyFromErrorCallback(void *arg)
2119 CopyState cstate = (CopyState) arg;
2123 /* can't usefully display the data */
2124 if (cstate->cur_attname)
2125 errcontext("COPY %s, line %d, column %s",
2126 cstate->cur_relname, cstate->cur_lineno,
2127 cstate->cur_attname);
2129 errcontext("COPY %s, line %d",
2130 cstate->cur_relname, cstate->cur_lineno);
2134 if (cstate->cur_attname && cstate->cur_attval)
2136 /* error is relevant to a particular column */
2139 attval = limit_printout_length(cstate->cur_attval);
2140 errcontext("COPY %s, line %d, column %s: \"%s\"",
2141 cstate->cur_relname, cstate->cur_lineno,
2142 cstate->cur_attname, attval);
2145 else if (cstate->cur_attname)
2147 /* error is relevant to a particular column, value is NULL */
2148 errcontext("COPY %s, line %d, column %s: null input",
2149 cstate->cur_relname, cstate->cur_lineno,
2150 cstate->cur_attname);
2155 * Error is relevant to a particular line.
2157 * If line_buf still contains the correct line, and it's already
2158 * transcoded, print it. If it's still in a foreign encoding, it's
2159 * quite likely that the error is precisely a failure to do
2160 * encoding conversion (ie, bad data). We dare not try to convert
2161 * it, and at present there's no way to regurgitate it without
2162 * conversion. So we have to punt and just report the line number.
2164 if (cstate->line_buf_valid &&
2165 (cstate->line_buf_converted || !cstate->need_transcoding))
2169 lineval = limit_printout_length(cstate->line_buf.data);
2170 errcontext("COPY %s, line %d: \"%s\"",
2171 cstate->cur_relname, cstate->cur_lineno, lineval);
2176 errcontext("COPY %s, line %d",
2177 cstate->cur_relname, cstate->cur_lineno);
2184 * Make sure we don't print an unreasonable amount of COPY data in a message.
2186 * It would seem a lot easier to just use the sprintf "precision" limit to
2187 * truncate the string. However, some versions of glibc have a bug/misfeature
2188 * that vsnprintf will always fail (return -1) if it is asked to truncate
2189 * a string that contains invalid byte sequences for the current encoding.
2190 * So, do our own truncation. We return a pstrdup'd copy of the input.
2193 limit_printout_length(const char *str)
2195 #define MAX_COPY_DATA_DISPLAY 100
2197 int slen = strlen(str);
2201 /* Fast path if definitely okay */
2202 if (slen <= MAX_COPY_DATA_DISPLAY)
2203 return pstrdup(str);
2205 /* Apply encoding-dependent truncation */
2206 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2209 * Truncate, and add "..." to show we truncated the input.
2211 res = (char *) palloc(len + 4);
2212 memcpy(res, str, len);
2213 strcpy(res + len, "...");
2219 * Copy FROM file to relation.
2222 CopyFrom(CopyState cstate)
2228 ResultRelInfo *resultRelInfo;
2229 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2230 ExprContext *econtext;
2231 TupleTableSlot *myslot;
2232 MemoryContext oldcontext = CurrentMemoryContext;
2234 ErrorContextCallback errcallback;
2235 CommandId mycid = GetCurrentCommandId(true);
2236 int hi_options = 0; /* start with default heap_insert options */
2237 BulkInsertState bistate;
2238 uint64 processed = 0;
2239 bool useHeapMultiInsert;
2240 int nBufferedTuples = 0;
2242 #define MAX_BUFFERED_TUPLES 1000
2243 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2244 Size bufferedTuplesSize = 0;
2245 int firstBufferedLineNo = 0;
2247 Assert(cstate->rel);
2249 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
2251 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2253 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2254 errmsg("cannot copy to view \"%s\"",
2255 RelationGetRelationName(cstate->rel))));
2256 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2258 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2259 errmsg("cannot copy to materialized view \"%s\"",
2260 RelationGetRelationName(cstate->rel))));
2261 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2263 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2264 errmsg("cannot copy to foreign table \"%s\"",
2265 RelationGetRelationName(cstate->rel))));
2266 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2268 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2269 errmsg("cannot copy to sequence \"%s\"",
2270 RelationGetRelationName(cstate->rel))));
2273 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2274 errmsg("cannot copy to non-table relation \"%s\"",
2275 RelationGetRelationName(cstate->rel))));
2278 tupDesc = RelationGetDescr(cstate->rel);
2281 * Check to see if we can avoid writing WAL
2283 * If archive logging/streaming is not enabled *and* either
2284 * - table was created in same transaction as this COPY
2285 * - data is being written to relfilenode created in this transaction
2286 * then we can skip writing WAL. It's safe because if the transaction
2287 * doesn't commit, we'll discard the table (or the new relfilenode file).
2288 * If it does commit, we'll have done the heap_sync at the bottom of this
2291 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2292 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2293 * can be cleared before the end of the transaction. The exact case is
2294 * when a relation sets a new relfilenode twice in same transaction, yet
2295 * the second one fails in an aborted subtransaction, e.g.
2304 * Also, if the target file is new-in-transaction, we assume that checking
2305 * FSM for free space is a waste of time, even if we must use WAL because
2306 * of archiving. This could possibly be wrong, but it's unlikely.
2308 * The comments for heap_insert and RelationGetBufferForTuple specify that
2309 * skipping WAL logging is only safe if we ensure that our tuples do not
2310 * go into pages containing tuples from any other transactions --- but this
2311 * must be the case if we have a new table or new relfilenode, so we need
2312 * no additional work to enforce that.
2315 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2316 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2317 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2319 hi_options |= HEAP_INSERT_SKIP_FSM;
2320 if (!XLogIsNeeded())
2321 hi_options |= HEAP_INSERT_SKIP_WAL;
2325 * Optimize if new relfilenode was created in this subxact or one of its
2326 * committed children and we won't see those rows later as part of an
2327 * earlier scan or command. This ensures that if this subtransaction
2328 * aborts then the frozen rows won't be visible after xact cleanup. Note
2329 * that the stronger test of exactly which subtransaction created it is
2330 * crucial for correctness of this optimisation.
2334 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2336 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2337 errmsg("cannot perform FREEZE because of prior transaction activity")));
2339 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2340 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2342 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2343 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2345 hi_options |= HEAP_INSERT_FROZEN;
2349 * We need a ResultRelInfo so we can use the regular executor's
2350 * index-entry-making machinery. (There used to be a huge amount of code
2351 * here that basically duplicated execUtils.c ...)
2353 resultRelInfo = makeNode(ResultRelInfo);
2354 InitResultRelInfo(resultRelInfo,
2356 1, /* dummy rangetable index */
2359 ExecOpenIndices(resultRelInfo, false);
2361 estate->es_result_relations = resultRelInfo;
2362 estate->es_num_result_relations = 1;
2363 estate->es_result_relation_info = resultRelInfo;
2364 estate->es_range_table = cstate->range_table;
2366 /* Set up a tuple slot too */
2367 myslot = ExecInitExtraTupleSlot(estate);
2368 ExecSetSlotDescriptor(myslot, tupDesc);
2369 /* Triggers might need a slot as well */
2370 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2373 * It's more efficient to prepare a bunch of tuples for insertion, and
2374 * insert them in one heap_multi_insert() call, than call heap_insert()
2375 * separately for every tuple. However, we can't do that if there are
2376 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2377 * expressions. Such triggers or expressions might query the table we're
2378 * inserting to, and act differently if the tuples that have already been
2379 * processed and prepared for insertion are not there.
2381 if ((resultRelInfo->ri_TrigDesc != NULL &&
2382 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2383 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2384 cstate->volatile_defexprs)
2386 useHeapMultiInsert = false;
2390 useHeapMultiInsert = true;
2391 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2394 /* Prepare to catch AFTER triggers. */
2395 AfterTriggerBeginQuery();
2398 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2399 * should do this for COPY, since it's not really an "INSERT" statement as
2400 * such. However, executing these triggers maintains consistency with the
2401 * EACH ROW triggers that we already fire on COPY.
2403 ExecBSInsertTriggers(estate, resultRelInfo);
2405 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2406 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2408 bistate = GetBulkInsertState();
2409 econtext = GetPerTupleExprContext(estate);
2411 /* Set up callback to identify error line number */
2412 errcallback.callback = CopyFromErrorCallback;
2413 errcallback.arg = (void *) cstate;
2414 errcallback.previous = error_context_stack;
2415 error_context_stack = &errcallback;
2419 TupleTableSlot *slot;
2421 Oid loaded_oid = InvalidOid;
2423 CHECK_FOR_INTERRUPTS();
2425 if (nBufferedTuples == 0)
2428 * Reset the per-tuple exprcontext. We can only do this if the
2429 * tuple buffer is empty. (Calling the context the per-tuple
2430 * memory context is a bit of a misnomer now.)
2432 ResetPerTupleExprContext(estate);
2435 /* Switch into its memory context */
2436 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2438 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2441 /* And now we can form the input tuple. */
2442 tuple = heap_form_tuple(tupDesc, values, nulls);
2444 if (loaded_oid != InvalidOid)
2445 HeapTupleSetOid(tuple, loaded_oid);
2448 * Constraints might reference the tableoid column, so initialize
2449 * t_tableOid before evaluating them.
2451 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2453 /* Triggers and stuff need to be invoked in query context. */
2454 MemoryContextSwitchTo(oldcontext);
2456 /* Place tuple in tuple slot --- but slot shouldn't free it */
2458 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2462 /* BEFORE ROW INSERT Triggers */
2463 if (resultRelInfo->ri_TrigDesc &&
2464 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2466 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2468 if (slot == NULL) /* "do nothing" */
2470 else /* trigger might have changed tuple */
2471 tuple = ExecMaterializeSlot(slot);
2476 /* Check the constraints of the tuple */
2477 if (cstate->rel->rd_att->constr)
2478 ExecConstraints(resultRelInfo, slot, estate);
2480 if (useHeapMultiInsert)
2482 /* Add this tuple to the tuple buffer */
2483 if (nBufferedTuples == 0)
2484 firstBufferedLineNo = cstate->cur_lineno;
2485 bufferedTuples[nBufferedTuples++] = tuple;
2486 bufferedTuplesSize += tuple->t_len;
2489 * If the buffer filled up, flush it. Also flush if the total
2490 * size of all the tuples in the buffer becomes large, to
2491 * avoid using large amounts of memory for the buffers when
2492 * the tuples are exceptionally wide.
2494 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2495 bufferedTuplesSize > 65535)
2497 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2498 resultRelInfo, myslot, bistate,
2499 nBufferedTuples, bufferedTuples,
2500 firstBufferedLineNo);
2501 nBufferedTuples = 0;
2502 bufferedTuplesSize = 0;
2507 List *recheckIndexes = NIL;
2509 /* OK, store the tuple and create index entries for it */
2510 heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
2512 if (resultRelInfo->ri_NumIndices > 0)
2513 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
2514 estate, false, NULL,
2517 /* AFTER ROW INSERT Triggers */
2518 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2521 list_free(recheckIndexes);
2525 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2526 * this is the same definition used by execMain.c for counting
2527 * tuples inserted by an INSERT command.
2533 /* Flush any remaining buffered tuples */
2534 if (nBufferedTuples > 0)
2535 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2536 resultRelInfo, myslot, bistate,
2537 nBufferedTuples, bufferedTuples,
2538 firstBufferedLineNo);
2540 /* Done, clean up */
2541 error_context_stack = errcallback.previous;
2543 FreeBulkInsertState(bistate);
2545 MemoryContextSwitchTo(oldcontext);
2548 * In the old protocol, tell pqcomm that we can process normal protocol
2551 if (cstate->copy_dest == COPY_OLD_FE)
2554 /* Execute AFTER STATEMENT insertion triggers */
2555 ExecASInsertTriggers(estate, resultRelInfo);
2557 /* Handle queued AFTER triggers */
2558 AfterTriggerEndQuery(estate);
2563 ExecResetTupleTable(estate->es_tupleTable, false);
2565 ExecCloseIndices(resultRelInfo);
2567 FreeExecutorState(estate);
2570 * If we skipped writing WAL, then we need to sync the heap (but not
2571 * indexes since those use WAL anyway)
2573 if (hi_options & HEAP_INSERT_SKIP_WAL)
2574 heap_sync(cstate->rel);
2580 * A subroutine of CopyFrom, to write the current batch of buffered heap
2581 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2585 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2586 int hi_options, ResultRelInfo *resultRelInfo,
2587 TupleTableSlot *myslot, BulkInsertState bistate,
2588 int nBufferedTuples, HeapTuple *bufferedTuples,
2589 int firstBufferedLineNo)
2591 MemoryContext oldcontext;
2593 int save_cur_lineno;
2596 * Print error context information correctly, if one of the operations
2599 cstate->line_buf_valid = false;
2600 save_cur_lineno = cstate->cur_lineno;
2603 * heap_multi_insert leaks memory, so switch to short-lived memory context
2604 * before calling it.
2606 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2607 heap_multi_insert(cstate->rel,
2613 MemoryContextSwitchTo(oldcontext);
2616 * If there are any indexes, update them for all the inserted tuples, and
2617 * run AFTER ROW INSERT triggers.
2619 if (resultRelInfo->ri_NumIndices > 0)
2621 for (i = 0; i < nBufferedTuples; i++)
2623 List *recheckIndexes;
2625 cstate->cur_lineno = firstBufferedLineNo + i;
2626 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2628 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2629 estate, false, NULL, NIL);
2630 ExecARInsertTriggers(estate, resultRelInfo,
2633 list_free(recheckIndexes);
2638 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2641 else if (resultRelInfo->ri_TrigDesc != NULL &&
2642 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
2644 for (i = 0; i < nBufferedTuples; i++)
2646 cstate->cur_lineno = firstBufferedLineNo + i;
2647 ExecARInsertTriggers(estate, resultRelInfo,
2653 /* reset cur_lineno to where we were */
2654 cstate->cur_lineno = save_cur_lineno;
2658 * Setup to read tuples from a file for COPY FROM.
2660 * 'rel': Used as a template for the tuples
2661 * 'filename': Name of server-local file to read
2662 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2663 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2665 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2668 BeginCopyFrom(ParseState *pstate,
2670 const char *filename,
2676 bool pipe = (filename == NULL);
2678 Form_pg_attribute *attr;
2679 AttrNumber num_phys_attrs,
2681 FmgrInfo *in_functions;
2686 ExprState **defexprs;
2687 MemoryContext oldcontext;
2688 bool volatile_defexprs;
2690 cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
2691 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2693 /* Initialize state variables */
2694 cstate->fe_eof = false;
2695 cstate->eol_type = EOL_UNKNOWN;
2696 cstate->cur_relname = RelationGetRelationName(cstate->rel);
2697 cstate->cur_lineno = 0;
2698 cstate->cur_attname = NULL;
2699 cstate->cur_attval = NULL;
2701 /* Set up variables to avoid per-attribute overhead. */
2702 initStringInfo(&cstate->attribute_buf);
2703 initStringInfo(&cstate->line_buf);
2704 cstate->line_buf_converted = false;
2705 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2706 cstate->raw_buf_index = cstate->raw_buf_len = 0;
2708 tupDesc = RelationGetDescr(cstate->rel);
2709 attr = tupDesc->attrs;
2710 num_phys_attrs = tupDesc->natts;
2712 volatile_defexprs = false;
2715 * Pick up the required catalog information for each attribute in the
2716 * relation, including the input function, the element type (to pass to
2717 * the input function), and info about defaults and constraints. (Which
2718 * input function we use depends on text/binary format choice.)
2720 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2721 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2722 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2723 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2725 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2727 /* We don't need info for dropped attributes */
2728 if (attr[attnum - 1]->attisdropped)
2731 /* Fetch the input function and typioparam info */
2733 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2734 &in_func_oid, &typioparams[attnum - 1]);
2736 getTypeInputInfo(attr[attnum - 1]->atttypid,
2737 &in_func_oid, &typioparams[attnum - 1]);
2738 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2740 /* Get default info if needed */
2741 if (!list_member_int(cstate->attnumlist, attnum))
2743 /* attribute is NOT to be copied from input */
2744 /* use default value if one exists */
2745 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
2748 if (defexpr != NULL)
2750 /* Run the expression through planner */
2751 defexpr = expression_planner(defexpr);
2753 /* Initialize executable expression in copycontext */
2754 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
2755 defmap[num_defaults] = attnum - 1;
2759 * If a default expression looks at the table being loaded,
2760 * then it could give the wrong answer when using
2761 * multi-insert. Since database access can be dynamic this is
2762 * hard to test for exactly, so we use the much wider test of
2763 * whether the default expression is volatile. We allow for
2764 * the special case of when the default expression is the
2765 * nextval() of a sequence which in this specific case is
2766 * known to be safe for use with the multi-insert
2767 * optimisation. Hence we use this special case function
2768 * checker rather than the standard check for
2769 * contain_volatile_functions().
2771 if (!volatile_defexprs)
2772 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
2777 /* We keep those variables in cstate. */
2778 cstate->in_functions = in_functions;
2779 cstate->typioparams = typioparams;
2780 cstate->defmap = defmap;
2781 cstate->defexprs = defexprs;
2782 cstate->volatile_defexprs = volatile_defexprs;
2783 cstate->num_defaults = num_defaults;
2784 cstate->is_program = is_program;
2788 Assert(!is_program); /* the grammar does not allow this */
2789 if (whereToSendOutput == DestRemote)
2790 ReceiveCopyBegin(cstate);
2792 cstate->copy_file = stdin;
2796 cstate->filename = pstrdup(filename);
2798 if (cstate->is_program)
2800 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
2801 if (cstate->copy_file == NULL)
2803 (errcode_for_file_access(),
2804 errmsg("could not execute command \"%s\": %m",
2805 cstate->filename)));
2811 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
2812 if (cstate->copy_file == NULL)
2814 (errcode_for_file_access(),
2815 errmsg("could not open file \"%s\" for reading: %m",
2816 cstate->filename)));
2818 if (fstat(fileno(cstate->copy_file), &st))
2820 (errcode_for_file_access(),
2821 errmsg("could not stat file \"%s\": %m",
2822 cstate->filename)));
2824 if (S_ISDIR(st.st_mode))
2826 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2827 errmsg("\"%s\" is a directory", cstate->filename)));
2831 if (!cstate->binary)
2833 /* must rely on user to tell us... */
2834 cstate->file_has_oids = cstate->oids;
2838 /* Read and verify binary header */
2843 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
2844 memcmp(readSig, BinarySignature, 11) != 0)
2846 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2847 errmsg("COPY file signature not recognized")));
2849 if (!CopyGetInt32(cstate, &tmp))
2851 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2852 errmsg("invalid COPY file header (missing flags)")));
2853 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
2855 if ((tmp >> 16) != 0)
2857 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2858 errmsg("unrecognized critical flags in COPY file header")));
2859 /* Header extension length */
2860 if (!CopyGetInt32(cstate, &tmp) ||
2863 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2864 errmsg("invalid COPY file header (missing length)")));
2865 /* Skip extension header, if present */
2868 if (CopyGetData(cstate, readSig, 1, 1) != 1)
2870 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2871 errmsg("invalid COPY file header (wrong length)")));
2875 if (cstate->file_has_oids && cstate->binary)
2877 getTypeBinaryInputInfo(OIDOID,
2878 &in_func_oid, &cstate->oid_typioparam);
2879 fmgr_info(in_func_oid, &cstate->oid_in_function);
2882 /* create workspace for CopyReadAttributes results */
2883 if (!cstate->binary)
2885 AttrNumber attr_count = list_length(cstate->attnumlist);
2886 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
2888 cstate->max_fields = nfields;
2889 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
2892 MemoryContextSwitchTo(oldcontext);
2898 * Read raw fields in the next line for COPY FROM in text or csv mode.
2899 * Return false if no more lines.
2901 * An internal temporary buffer is returned via 'fields'. It is valid until
2902 * the next call of the function. Since the function returns all raw fields
2903 * in the input file, 'nfields' could be different from the number of columns
2906 * NOTE: force_not_null option are not applied to the returned fields.
2909 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
2914 /* only available for text or csv input */
2915 Assert(!cstate->binary);
2917 /* on input just throw the header line away */
2918 if (cstate->cur_lineno == 0 && cstate->header_line)
2920 cstate->cur_lineno++;
2921 if (CopyReadLine(cstate))
2922 return false; /* done */
2925 cstate->cur_lineno++;
2927 /* Actually read the line into memory here */
2928 done = CopyReadLine(cstate);
2931 * EOF at start of line means we're done. If we see EOF after some
2932 * characters, we act as though it was newline followed by EOF, ie,
2933 * process the line and then exit loop on next iteration.
2935 if (done && cstate->line_buf.len == 0)
2938 /* Parse the line into de-escaped field values */
2939 if (cstate->csv_mode)
2940 fldct = CopyReadAttributesCSV(cstate);
2942 fldct = CopyReadAttributesText(cstate);
2944 *fields = cstate->raw_fields;
2950 * Read next tuple from file for COPY FROM. Return false if no more tuples.
2952 * 'econtext' is used to evaluate default expression for each columns not
2953 * read from the file. It can be NULL when no default values are used, i.e.
2954 * when all columns are read from the file.
2956 * 'values' and 'nulls' arrays must be the same length as columns of the
2957 * relation passed to BeginCopyFrom. This function fills the arrays.
2958 * Oid of the tuple is returned with 'tupleOid' separately.
2961 NextCopyFrom(CopyState cstate, ExprContext *econtext,
2962 Datum *values, bool *nulls, Oid *tupleOid)
2965 Form_pg_attribute *attr;
2966 AttrNumber num_phys_attrs,
2968 num_defaults = cstate->num_defaults;
2969 FmgrInfo *in_functions = cstate->in_functions;
2970 Oid *typioparams = cstate->typioparams;
2974 bool file_has_oids = cstate->file_has_oids;
2975 int *defmap = cstate->defmap;
2976 ExprState **defexprs = cstate->defexprs;
2978 tupDesc = RelationGetDescr(cstate->rel);
2979 attr = tupDesc->attrs;
2980 num_phys_attrs = tupDesc->natts;
2981 attr_count = list_length(cstate->attnumlist);
2982 nfields = file_has_oids ? (attr_count + 1) : attr_count;
2984 /* Initialize all values for row to NULL */
2985 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
2986 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
2988 if (!cstate->binary)
2990 char **field_strings;
2996 /* read raw fields in the next line */
2997 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3000 /* check for overflowing fields */
3001 if (nfields > 0 && fldct > nfields)
3003 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3004 errmsg("extra data after last expected column")));
3008 /* Read the OID field if present */
3011 if (fieldno >= fldct)
3013 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3014 errmsg("missing data for OID column")));
3015 string = field_strings[fieldno++];
3019 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3020 errmsg("null OID in COPY data")));
3021 else if (cstate->oids && tupleOid != NULL)
3023 cstate->cur_attname = "oid";
3024 cstate->cur_attval = string;
3025 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3026 CStringGetDatum(string)));
3027 if (*tupleOid == InvalidOid)
3029 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3030 errmsg("invalid OID in COPY data")));
3031 cstate->cur_attname = NULL;
3032 cstate->cur_attval = NULL;
3036 /* Loop to read the user attributes on the line. */
3037 foreach(cur, cstate->attnumlist)
3039 int attnum = lfirst_int(cur);
3042 if (fieldno >= fldct)
3044 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3045 errmsg("missing data for column \"%s\"",
3046 NameStr(attr[m]->attname))));
3047 string = field_strings[fieldno++];
3049 if (cstate->convert_select_flags &&
3050 !cstate->convert_select_flags[m])
3052 /* ignore input field, leaving column as NULL */
3056 if (cstate->csv_mode)
3058 if (string == NULL &&
3059 cstate->force_notnull_flags[m])
3062 * FORCE_NOT_NULL option is set and column is NULL -
3063 * convert it to the NULL string.
3065 string = cstate->null_print;
3067 else if (string != NULL && cstate->force_null_flags[m]
3068 && strcmp(string, cstate->null_print) == 0)
3071 * FORCE_NULL option is set and column matches the NULL
3072 * string. It must have been quoted, or otherwise the
3073 * string would already have been set to NULL. Convert it
3074 * to NULL as specified.
3080 cstate->cur_attname = NameStr(attr[m]->attname);
3081 cstate->cur_attval = string;
3082 values[m] = InputFunctionCall(&in_functions[m],
3085 attr[m]->atttypmod);
3088 cstate->cur_attname = NULL;
3089 cstate->cur_attval = NULL;
3092 Assert(fieldno == nfields);
3100 cstate->cur_lineno++;
3102 if (!CopyGetInt16(cstate, &fld_count))
3104 /* EOF detected (end of file, or protocol-level EOF) */
3108 if (fld_count == -1)
3111 * Received EOF marker. In a V3-protocol copy, wait for the
3112 * protocol-level EOF, and complain if it doesn't come
3113 * immediately. This ensures that we correctly handle CopyFail,
3114 * if client chooses to send that now.
3116 * Note that we MUST NOT try to read more data in an old-protocol
3117 * copy, since there is no protocol-level EOF marker then. We
3118 * could go either way for copy from file, but choose to throw
3119 * error if there's data after the EOF marker, for consistency
3120 * with the new-protocol case.
3124 if (cstate->copy_dest != COPY_OLD_FE &&
3125 CopyGetData(cstate, &dummy, 1, 1) > 0)
3127 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3128 errmsg("received copy data after EOF marker")));
3132 if (fld_count != attr_count)
3134 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3135 errmsg("row field count is %d, expected %d",
3136 (int) fld_count, attr_count)));
3142 cstate->cur_attname = "oid";
3144 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3146 &cstate->oid_in_function,
3147 cstate->oid_typioparam,
3150 if (isnull || loaded_oid == InvalidOid)
3152 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3153 errmsg("invalid OID in COPY data")));
3154 cstate->cur_attname = NULL;
3155 if (cstate->oids && tupleOid != NULL)
3156 *tupleOid = loaded_oid;
3160 foreach(cur, cstate->attnumlist)
3162 int attnum = lfirst_int(cur);
3165 cstate->cur_attname = NameStr(attr[m]->attname);
3167 values[m] = CopyReadBinaryAttribute(cstate,
3173 cstate->cur_attname = NULL;
3178 * Now compute and insert any defaults available for the columns not
3179 * provided by the input data. Anything not processed here or above will
3182 for (i = 0; i < num_defaults; i++)
3185 * The caller must supply econtext and have switched into the
3186 * per-tuple memory context in it.
3188 Assert(econtext != NULL);
3189 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3191 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3192 &nulls[defmap[i]], NULL);
3199 * Clean up storage and release resources for COPY FROM.
3202 EndCopyFrom(CopyState cstate)
3204 /* No COPY FROM related resources except memory. */
3210 * Read the next input line and stash it in line_buf, with conversion to
3213 * Result is true if read was terminated by EOF, false if terminated
3214 * by newline. The terminating newline or EOF marker is not included
3215 * in the final value of line_buf.
3218 CopyReadLine(CopyState cstate)
3222 resetStringInfo(&cstate->line_buf);
3223 cstate->line_buf_valid = true;
3225 /* Mark that encoding conversion hasn't occurred yet */
3226 cstate->line_buf_converted = false;
3228 /* Parse data and transfer into line_buf */
3229 result = CopyReadLineText(cstate);
3234 * Reached EOF. In protocol version 3, we should ignore anything
3235 * after \. up to the protocol end of copy data. (XXX maybe better
3236 * not to treat \. as special?)
3238 if (cstate->copy_dest == COPY_NEW_FE)
3242 cstate->raw_buf_index = cstate->raw_buf_len;
3243 } while (CopyLoadRawBuf(cstate));
3249 * If we didn't hit EOF, then we must have transferred the EOL marker
3250 * to line_buf along with the data. Get rid of it.
3252 switch (cstate->eol_type)
3255 Assert(cstate->line_buf.len >= 1);
3256 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3257 cstate->line_buf.len--;
3258 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3261 Assert(cstate->line_buf.len >= 1);
3262 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3263 cstate->line_buf.len--;
3264 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3267 Assert(cstate->line_buf.len >= 2);
3268 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3269 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3270 cstate->line_buf.len -= 2;
3271 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3274 /* shouldn't get here */
3280 /* Done reading the line. Convert it to server encoding. */
3281 if (cstate->need_transcoding)
3285 cvt = pg_any_to_server(cstate->line_buf.data,
3286 cstate->line_buf.len,
3287 cstate->file_encoding);
3288 if (cvt != cstate->line_buf.data)
3290 /* transfer converted data back to line_buf */
3291 resetStringInfo(&cstate->line_buf);
3292 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3297 /* Now it's safe to use the buffer in error messages */
3298 cstate->line_buf_converted = true;
3304 * CopyReadLineText - inner loop of CopyReadLine for text mode
3307 CopyReadLineText(CopyState cstate)
3312 bool need_data = false;
3313 bool hit_eof = false;
3314 bool result = false;
3318 bool first_char_in_line = true;
3319 bool in_quote = false,
3320 last_was_esc = false;
3322 char escapec = '\0';
3324 if (cstate->csv_mode)
3326 quotec = cstate->quote[0];
3327 escapec = cstate->escape[0];
3328 /* ignore special escape processing if it's the same as quotec */
3329 if (quotec == escapec)
3333 mblen_str[1] = '\0';
3336 * The objective of this loop is to transfer the entire next input line
3337 * into line_buf. Hence, we only care for detecting newlines (\r and/or
3338 * \n) and the end-of-copy marker (\.).
3340 * In CSV mode, \r and \n inside a quoted field are just part of the data
3341 * value and are put in line_buf. We keep just enough state to know if we
3342 * are currently in a quoted field or not.
3344 * These four characters, and the CSV escape and quote characters, are
3345 * assumed the same in frontend and backend encodings.
3347 * For speed, we try to move data from raw_buf to line_buf in chunks
3348 * rather than one character at a time. raw_buf_ptr points to the next
3349 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3350 * have been determined to be part of the line, but not yet transferred to
3353 * For a little extra speed within the loop, we copy raw_buf and
3354 * raw_buf_len into local variables.
3356 copy_raw_buf = cstate->raw_buf;
3357 raw_buf_ptr = cstate->raw_buf_index;
3358 copy_buf_len = cstate->raw_buf_len;
3366 * Load more data if needed. Ideally we would just force four bytes
3367 * of read-ahead and avoid the many calls to
3368 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3369 * does not allow us to read too far ahead or we might read into the
3370 * next data, so we read-ahead only as far we know we can. One
3371 * optimization would be to read-ahead four byte here if
3372 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3373 * considering the size of the buffer.
3375 if (raw_buf_ptr >= copy_buf_len || need_data)
3380 * Try to read some more data. This will certainly reset
3381 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3383 if (!CopyLoadRawBuf(cstate))
3386 copy_buf_len = cstate->raw_buf_len;
3389 * If we are completely out of data, break out of the loop,
3392 if (copy_buf_len <= 0)
3400 /* OK to fetch a character */
3401 prev_raw_ptr = raw_buf_ptr;
3402 c = copy_raw_buf[raw_buf_ptr++];
3404 if (cstate->csv_mode)
3407 * If character is '\\' or '\r', we may need to look ahead below.
3408 * Force fetch of the next character if we don't already have it.
3409 * We need to do this before changing CSV state, in case one of
3410 * these characters is also the quote or escape character.
3412 * Note: old-protocol does not like forced prefetch, but it's OK
3413 * here since we cannot validly be at EOF.
3415 if (c == '\\' || c == '\r')
3417 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3421 * Dealing with quotes and escapes here is mildly tricky. If the
3422 * quote char is also the escape char, there's no problem - we
3423 * just use the char as a toggle. If they are different, we need
3424 * to ensure that we only take account of an escape inside a
3425 * quoted field and immediately preceding a quote char, and not
3426 * the second in an escape-escape sequence.
3428 if (in_quote && c == escapec)
3429 last_was_esc = !last_was_esc;
3430 if (c == quotec && !last_was_esc)
3431 in_quote = !in_quote;
3433 last_was_esc = false;
3436 * Updating the line count for embedded CR and/or LF chars is
3437 * necessarily a little fragile - this test is probably about the
3438 * best we can do. (XXX it's arguable whether we should do this
3439 * at all --- is cur_lineno a physical or logical count?)
3441 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3442 cstate->cur_lineno++;
3446 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3448 /* Check for \r\n on first line, _and_ handle \r\n. */
3449 if (cstate->eol_type == EOL_UNKNOWN ||
3450 cstate->eol_type == EOL_CRNL)
3453 * If need more data, go back to loop top to load it.
3455 * Note that if we are at EOF, c will wind up as '\0' because
3456 * of the guaranteed pad of raw_buf.
3458 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3461 c = copy_raw_buf[raw_buf_ptr];
3465 raw_buf_ptr++; /* eat newline */
3466 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3470 /* found \r, but no \n */
3471 if (cstate->eol_type == EOL_CRNL)
3473 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3475 errmsg("literal carriage return found in data") :
3476 errmsg("unquoted carriage return found in data"),
3478 errhint("Use \"\\r\" to represent carriage return.") :
3479 errhint("Use quoted CSV field to represent carriage return.")));
3482 * if we got here, it is the first line and we didn't find
3483 * \n, so don't consume the peeked character
3485 cstate->eol_type = EOL_CR;
3488 else if (cstate->eol_type == EOL_NL)
3490 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3492 errmsg("literal carriage return found in data") :
3493 errmsg("unquoted carriage return found in data"),
3495 errhint("Use \"\\r\" to represent carriage return.") :
3496 errhint("Use quoted CSV field to represent carriage return.")));
3497 /* If reach here, we have found the line terminator */
3502 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3504 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3506 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3508 errmsg("literal newline found in data") :
3509 errmsg("unquoted newline found in data"),
3511 errhint("Use \"\\n\" to represent newline.") :
3512 errhint("Use quoted CSV field to represent newline.")));
3513 cstate->eol_type = EOL_NL; /* in case not set yet */
3514 /* If reach here, we have found the line terminator */
3519 * In CSV mode, we only recognize \. alone on a line. This is because
3520 * \. is a valid CSV data value.
3522 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3526 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3527 IF_NEED_REFILL_AND_EOF_BREAK(0);
3530 * get next character
3531 * Note: we do not change c so if it isn't \., we can fall
3532 * through and continue processing for file encoding.
3535 c2 = copy_raw_buf[raw_buf_ptr];
3539 raw_buf_ptr++; /* consume the '.' */
3542 * Note: if we loop back for more data here, it does not
3543 * matter that the CSV state change checks are re-executed; we
3544 * will come back here with no important state changed.
3546 if (cstate->eol_type == EOL_CRNL)
3548 /* Get the next character */
3549 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3550 /* if hit_eof, c2 will become '\0' */
3551 c2 = copy_raw_buf[raw_buf_ptr++];
3555 if (!cstate->csv_mode)
3557 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3558 errmsg("end-of-copy marker does not match previous newline style")));
3560 NO_END_OF_COPY_GOTO;
3562 else if (c2 != '\r')
3564 if (!cstate->csv_mode)
3566 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3567 errmsg("end-of-copy marker corrupt")));
3569 NO_END_OF_COPY_GOTO;
3573 /* Get the next character */
3574 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3575 /* if hit_eof, c2 will become '\0' */
3576 c2 = copy_raw_buf[raw_buf_ptr++];
3578 if (c2 != '\r' && c2 != '\n')
3580 if (!cstate->csv_mode)
3582 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3583 errmsg("end-of-copy marker corrupt")));
3585 NO_END_OF_COPY_GOTO;
3588 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3589 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3590 (cstate->eol_type == EOL_CR && c2 != '\r'))
3593 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3594 errmsg("end-of-copy marker does not match previous newline style")));
3598 * Transfer only the data before the \. into line_buf, then
3599 * discard the data and the \. sequence.
3601 if (prev_raw_ptr > cstate->raw_buf_index)
3602 appendBinaryStringInfo(&cstate->line_buf,
3603 cstate->raw_buf + cstate->raw_buf_index,
3604 prev_raw_ptr - cstate->raw_buf_index);
3605 cstate->raw_buf_index = raw_buf_ptr;
3606 result = true; /* report EOF */
3609 else if (!cstate->csv_mode)
3612 * If we are here, it means we found a backslash followed by
3613 * something other than a period. In non-CSV mode, anything
3614 * after a backslash is special, so we skip over that second
3615 * character too. If we didn't do that \\. would be
3616 * considered an eof-of copy, while in non-CSV mode it is a
3617 * literal backslash followed by a period. In CSV mode,
3618 * backslashes are not special, so we want to process the
3619 * character after the backslash just like a normal character,
3620 * so we don't increment in those cases.
3626 * This label is for CSV cases where \. appears at the start of a
3627 * line, but there is more text after it, meaning it was a data value.
3628 * We are more strict for \. in CSV mode because \. could be a data
3629 * value, while in non-CSV mode, \. cannot be a data value.
3634 * Process all bytes of a multi-byte character as a group.
3636 * We only support multi-byte sequences where the first byte has the
3637 * high-bit set, so as an optimization we can avoid this block
3638 * entirely if it is not set.
3640 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3645 /* All our encodings only read the first byte to get the length */
3646 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3647 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3648 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3649 raw_buf_ptr += mblen - 1;
3651 first_char_in_line = false;
3652 } /* end of outer loop */
3655 * Transfer any still-uncopied data to line_buf.
3663 * Return decimal value for a hexadecimal digit
3666 GetDecimalFromHex(char hex)
3668 if (isdigit((unsigned char) hex))
3671 return tolower((unsigned char) hex) - 'a' + 10;
3675 * Parse the current line into separate attributes (fields),
3676 * performing de-escaping as needed.
3678 * The input is in line_buf. We use attribute_buf to hold the result
3679 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3680 * string, or NULL when the input matches the null marker string.
3681 * This array is expanded as necessary.
3683 * (Note that the caller cannot check for nulls since the returned
3684 * string would be the post-de-escaping equivalent, which may look
3685 * the same as some valid data string.)
3687 * delim is the column delimiter string (must be just one byte for now).
3688 * null_print is the null marker string. Note that this is compared to
3689 * the pre-de-escaped input string.
3691 * The return value is the number of fields actually read.
3694 CopyReadAttributesText(CopyState cstate)
3696 char delimc = cstate->delim[0];
3703 * We need a special case for zero-column tables: check that the input
3704 * line is empty, and return.
3706 if (cstate->max_fields <= 0)
3708 if (cstate->line_buf.len != 0)
3710 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3711 errmsg("extra data after last expected column")));
3715 resetStringInfo(&cstate->attribute_buf);
3718 * The de-escaped attributes will certainly not be longer than the input
3719 * data line, so we can just force attribute_buf to be large enough and
3720 * then transfer data without any checks for enough space. We need to do
3721 * it this way because enlarging attribute_buf mid-stream would invalidate
3722 * pointers already stored into cstate->raw_fields[].
3724 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3725 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3726 output_ptr = cstate->attribute_buf.data;
3728 /* set pointer variables for loop */
3729 cur_ptr = cstate->line_buf.data;
3730 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3732 /* Outer loop iterates over fields */
3736 bool found_delim = false;
3740 bool saw_non_ascii = false;
3742 /* Make sure there is enough space for the next value */
3743 if (fieldno >= cstate->max_fields)
3745 cstate->max_fields *= 2;
3746 cstate->raw_fields =
3747 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3750 /* Remember start of field on both input and output sides */
3751 start_ptr = cur_ptr;
3752 cstate->raw_fields[fieldno] = output_ptr;
3755 * Scan data for field.
3757 * Note that in this loop, we are scanning to locate the end of field
3758 * and also speculatively performing de-escaping. Once we find the
3759 * end-of-field, we can match the raw field contents against the null
3760 * marker string. Only after that comparison fails do we know that
3761 * de-escaping is actually the right thing to do; therefore we *must
3762 * not* throw any syntax errors before we've done the null-marker
3770 if (cur_ptr >= line_end_ptr)
3780 if (cur_ptr >= line_end_ptr)
3798 if (cur_ptr < line_end_ptr)
3804 val = (val << 3) + OCTVALUE(c);
3805 if (cur_ptr < line_end_ptr)
3811 val = (val << 3) + OCTVALUE(c);
3817 if (c == '\0' || IS_HIGHBIT_SET(c))
3818 saw_non_ascii = true;
3823 if (cur_ptr < line_end_ptr)
3825 char hexchar = *cur_ptr;
3827 if (isxdigit((unsigned char) hexchar))
3829 int val = GetDecimalFromHex(hexchar);
3832 if (cur_ptr < line_end_ptr)
3835 if (isxdigit((unsigned char) hexchar))
3838 val = (val << 4) + GetDecimalFromHex(hexchar);
3842 if (c == '\0' || IS_HIGHBIT_SET(c))
3843 saw_non_ascii = true;
3867 * in all other cases, take the char after '\'
3873 /* Add c to output string */
3877 /* Check whether raw input matched null marker */
3878 input_len = end_ptr - start_ptr;
3879 if (input_len == cstate->null_print_len &&
3880 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3881 cstate->raw_fields[fieldno] = NULL;
3885 * At this point we know the field is supposed to contain data.
3887 * If we de-escaped any non-7-bit-ASCII chars, make sure the
3888 * resulting string is valid data for the db encoding.
3892 char *fld = cstate->raw_fields[fieldno];
3894 pg_verifymbstr(fld, output_ptr - fld, false);
3898 /* Terminate attribute value in output area */
3899 *output_ptr++ = '\0';
3902 /* Done if we hit EOL instead of a delim */
3907 /* Clean up state of attribute_buf */
3909 Assert(*output_ptr == '\0');
3910 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3916 * Parse the current line into separate attributes (fields),
3917 * performing de-escaping as needed. This has exactly the same API as
3918 * CopyReadAttributesText, except we parse the fields according to
3919 * "standard" (i.e. common) CSV usage.
3922 CopyReadAttributesCSV(CopyState cstate)
3924 char delimc = cstate->delim[0];
3925 char quotec = cstate->quote[0];
3926 char escapec = cstate->escape[0];
3933 * We need a special case for zero-column tables: check that the input
3934 * line is empty, and return.
3936 if (cstate->max_fields <= 0)
3938 if (cstate->line_buf.len != 0)
3940 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3941 errmsg("extra data after last expected column")));
3945 resetStringInfo(&cstate->attribute_buf);
3948 * The de-escaped attributes will certainly not be longer than the input
3949 * data line, so we can just force attribute_buf to be large enough and
3950 * then transfer data without any checks for enough space. We need to do
3951 * it this way because enlarging attribute_buf mid-stream would invalidate
3952 * pointers already stored into cstate->raw_fields[].
3954 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3955 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3956 output_ptr = cstate->attribute_buf.data;
3958 /* set pointer variables for loop */
3959 cur_ptr = cstate->line_buf.data;
3960 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3962 /* Outer loop iterates over fields */
3966 bool found_delim = false;
3967 bool saw_quote = false;
3972 /* Make sure there is enough space for the next value */
3973 if (fieldno >= cstate->max_fields)
3975 cstate->max_fields *= 2;
3976 cstate->raw_fields =
3977 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3980 /* Remember start of field on both input and output sides */
3981 start_ptr = cur_ptr;
3982 cstate->raw_fields[fieldno] = output_ptr;
3985 * Scan data for field,
3987 * The loop starts in "not quote" mode and then toggles between that
3988 * and "in quote" mode. The loop exits normally if it is in "not
3989 * quote" mode and a delimiter or line end is seen.
3999 if (cur_ptr >= line_end_ptr)
4002 /* unquoted field delimiter */
4008 /* start of quoted field (or part of field) */
4014 /* Add c to output string */
4022 if (cur_ptr >= line_end_ptr)
4024 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4025 errmsg("unterminated CSV quoted field")));
4029 /* escape within a quoted field */
4033 * peek at the next char if available, and escape it if it
4034 * is an escape char or a quote char
4036 if (cur_ptr < line_end_ptr)
4038 char nextc = *cur_ptr;
4040 if (nextc == escapec || nextc == quotec)
4042 *output_ptr++ = nextc;
4050 * end of quoted field. Must do this test after testing for
4051 * escape in case quote char and escape char are the same
4052 * (which is the common case).
4057 /* Add c to output string */
4063 /* Terminate attribute value in output area */
4064 *output_ptr++ = '\0';
4066 /* Check whether raw input matched null marker */
4067 input_len = end_ptr - start_ptr;
4068 if (!saw_quote && input_len == cstate->null_print_len &&
4069 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4070 cstate->raw_fields[fieldno] = NULL;
4073 /* Done if we hit EOL instead of a delim */
4078 /* Clean up state of attribute_buf */
4080 Assert(*output_ptr == '\0');
4081 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4088 * Read a binary attribute
4091 CopyReadBinaryAttribute(CopyState cstate,
4092 int column_no, FmgrInfo *flinfo,
4093 Oid typioparam, int32 typmod,
4099 if (!CopyGetInt32(cstate, &fld_size))
4101 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4102 errmsg("unexpected EOF in COPY data")));
4106 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4110 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4111 errmsg("invalid field size")));
4113 /* reset attribute_buf to empty, and load raw data in it */
4114 resetStringInfo(&cstate->attribute_buf);
4116 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4117 if (CopyGetData(cstate, cstate->attribute_buf.data,
4118 fld_size, fld_size) != fld_size)
4120 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4121 errmsg("unexpected EOF in COPY data")));
4123 cstate->attribute_buf.len = fld_size;
4124 cstate->attribute_buf.data[fld_size] = '\0';
4126 /* Call the column type's binary input converter */
4127 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4128 typioparam, typmod);
4130 /* Trouble if it didn't eat the whole buffer */
4131 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4133 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4134 errmsg("incorrect binary data format")));
4141 * Send text representation of one attribute, with conversion and escaping
4143 #define DUMPSOFAR() \
4146 CopySendData(cstate, start, ptr - start); \
4150 CopyAttributeOutText(CopyState cstate, char *string)
4155 char delimc = cstate->delim[0];
4157 if (cstate->need_transcoding)
4158 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4163 * We have to grovel through the string searching for control characters
4164 * and instances of the delimiter character. In most cases, though, these
4165 * are infrequent. To avoid overhead from calling CopySendData once per
4166 * character, we dump out all characters between escaped characters in a
4167 * single call. The loop invariant is that the data from "start" to "ptr"
4168 * can be sent literally, but hasn't yet been.
4170 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4171 * in valid backend encodings, extra bytes of a multibyte character never
4172 * look like ASCII. This loop is sufficiently performance-critical that
4173 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4174 * of the normal safe-encoding path.
4176 if (cstate->encoding_embeds_ascii)
4179 while ((c = *ptr) != '\0')
4181 if ((unsigned char) c < (unsigned char) 0x20)
4184 * \r and \n must be escaped, the others are traditional. We
4185 * prefer to dump these using the C-like notation, rather than
4186 * a backslash and the literal character, because it makes the
4187 * dump file a bit more proof against Microsoftish data
4211 /* If it's the delimiter, must backslash it */
4214 /* All ASCII control chars are length 1 */
4216 continue; /* fall to end of loop */
4218 /* if we get here, we need to convert the control char */
4220 CopySendChar(cstate, '\\');
4221 CopySendChar(cstate, c);
4222 start = ++ptr; /* do not include char in next run */
4224 else if (c == '\\' || c == delimc)
4227 CopySendChar(cstate, '\\');
4228 start = ptr++; /* we include char in next run */
4230 else if (IS_HIGHBIT_SET(c))
4231 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4239 while ((c = *ptr) != '\0')
4241 if ((unsigned char) c < (unsigned char) 0x20)
4244 * \r and \n must be escaped, the others are traditional. We
4245 * prefer to dump these using the C-like notation, rather than
4246 * a backslash and the literal character, because it makes the
4247 * dump file a bit more proof against Microsoftish data
4271 /* If it's the delimiter, must backslash it */
4274 /* All ASCII control chars are length 1 */
4276 continue; /* fall to end of loop */
4278 /* if we get here, we need to convert the control char */
4280 CopySendChar(cstate, '\\');
4281 CopySendChar(cstate, c);
4282 start = ++ptr; /* do not include char in next run */
4284 else if (c == '\\' || c == delimc)
4287 CopySendChar(cstate, '\\');
4288 start = ptr++; /* we include char in next run */
4299 * Send text representation of one attribute, with conversion and
4300 * CSV-style escaping
4303 CopyAttributeOutCSV(CopyState cstate, char *string,
4304 bool use_quote, bool single_attr)
4309 char delimc = cstate->delim[0];
4310 char quotec = cstate->quote[0];
4311 char escapec = cstate->escape[0];
4313 /* force quoting if it matches null_print (before conversion!) */
4314 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4317 if (cstate->need_transcoding)
4318 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4323 * Make a preliminary pass to discover if it needs quoting
4328 * Because '\.' can be a data value, quote it if it appears alone on a
4329 * line so it is not interpreted as the end-of-data marker.
4331 if (single_attr && strcmp(ptr, "\\.") == 0)
4337 while ((c = *tptr) != '\0')
4339 if (c == delimc || c == quotec || c == '\n' || c == '\r')
4344 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4345 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4354 CopySendChar(cstate, quotec);
4357 * We adopt the same optimization strategy as in CopyAttributeOutText
4360 while ((c = *ptr) != '\0')
4362 if (c == quotec || c == escapec)
4365 CopySendChar(cstate, escapec);
4366 start = ptr; /* we include char in next run */
4368 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4369 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4375 CopySendChar(cstate, quotec);
4379 /* If it doesn't need quoting, we can just dump it as-is */
4380 CopySendString(cstate, ptr);
4385 * CopyGetAttnums - build an integer list of attnums to be copied
4387 * The input attnamelist is either the user-specified column list,
4388 * or NIL if there was none (in which case we want all the non-dropped
4391 * rel can be NULL ... it's only used for error reports.
4394 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4396 List *attnums = NIL;
4398 if (attnamelist == NIL)
4400 /* Generate default column list */
4401 Form_pg_attribute *attr = tupDesc->attrs;
4402 int attr_count = tupDesc->natts;
4405 for (i = 0; i < attr_count; i++)
4407 if (attr[i]->attisdropped)
4409 attnums = lappend_int(attnums, i + 1);
4414 /* Validate the user-supplied list and extract attnums */
4417 foreach(l, attnamelist)
4419 char *name = strVal(lfirst(l));
4423 /* Lookup column name */
4424 attnum = InvalidAttrNumber;
4425 for (i = 0; i < tupDesc->natts; i++)
4427 if (tupDesc->attrs[i]->attisdropped)
4429 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4431 attnum = tupDesc->attrs[i]->attnum;
4435 if (attnum == InvalidAttrNumber)
4439 (errcode(ERRCODE_UNDEFINED_COLUMN),
4440 errmsg("column \"%s\" of relation \"%s\" does not exist",
4441 name, RelationGetRelationName(rel))));
4444 (errcode(ERRCODE_UNDEFINED_COLUMN),
4445 errmsg("column \"%s\" does not exist",
4448 /* Check for duplicates */
4449 if (list_member_int(attnums, attnum))
4451 (errcode(ERRCODE_DUPLICATE_COLUMN),
4452 errmsg("column \"%s\" specified more than once",
4454 attnums = lappend_int(attnums, attnum);
4463 * copy_dest_startup --- executor startup
4466 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4472 * copy_dest_receive --- receive one tuple
4475 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4477 DR_copy *myState = (DR_copy *) self;
4478 CopyState cstate = myState->cstate;
4480 /* Make sure the tuple is fully deconstructed */
4481 slot_getallattrs(slot);
4483 /* And send the data */
4484 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4485 myState->processed++;
4491 * copy_dest_shutdown --- executor end
4494 copy_dest_shutdown(DestReceiver *self)
4500 * copy_dest_destroy --- release DestReceiver object
4503 copy_dest_destroy(DestReceiver *self)
4509 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4512 CreateCopyDestReceiver(void)
4514 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4516 self->pub.receiveSlot = copy_dest_receive;
4517 self->pub.rStartup = copy_dest_startup;
4518 self->pub.rShutdown = copy_dest_shutdown;
4519 self->pub.rDestroy = copy_dest_destroy;
4520 self->pub.mydest = DestCopyOut;
4522 self->cstate = NULL; /* will be set later */
4523 self->processed = 0;
4525 return (DestReceiver *) self;