1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.280 2007/04/16 01:14:55 tgl Exp $
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/xact.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_type.h"
27 #include "commands/copy.h"
28 #include "commands/trigger.h"
29 #include "executor/executor.h"
30 #include "libpq/libpq.h"
31 #include "libpq/pqformat.h"
32 #include "mb/pg_wchar.h"
33 #include "miscadmin.h"
34 #include "optimizer/planner.h"
35 #include "parser/parse_relation.h"
36 #include "rewrite/rewriteHandler.h"
37 #include "storage/fd.h"
38 #include "tcop/tcopprot.h"
39 #include "utils/acl.h"
40 #include "utils/builtins.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
45 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
46 #define OCTVALUE(c) ((c) - '0')
49 * Represents the different source/dest cases we need to worry about at
54 COPY_FILE, /* to/from file */
55 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
56 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
60 * Represents the end-of-line terminator type of the input
71 * This struct contains all the state variables used throughout a COPY
72 * operation. For simplicity, we use the same struct for all variants of COPY,
73 * even though some fields are used in only some cases.
75 * Multi-byte encodings: all supported client-side encodings encode multi-byte
76 * characters by having the first byte's high bit set. Subsequent bytes of the
77 * character can have the high bit not set. When scanning data in such an
78 * encoding to look for a match to a single-byte (ie ASCII) character, we must
79 * use the full pg_encoding_mblen() machinery to skip over multibyte
80 * characters, else we might find a false match to a trailing byte. In
81 * supported server encodings, there is no possibility of a false match, and
82 * it's faster to make useless comparisons to trailing bytes than it is to
83 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
84 * when we have to do it the hard way.
86 typedef struct CopyStateData
88 /* low-level state data */
89 CopyDest copy_dest; /* type of copy source/destination */
90 FILE *copy_file; /* used if copy_dest == COPY_FILE */
91 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
92 * dest == COPY_NEW_FE in COPY FROM */
93 bool fe_copy; /* true for all FE copy dests */
94 bool fe_eof; /* true if detected end of copy data */
95 EolType eol_type; /* EOL type of input */
96 int client_encoding; /* remote side's character encoding */
97 bool need_transcoding; /* client encoding diff from server? */
98 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
99 uint64 processed; /* # of tuples processed */
101 /* parameters from the COPY command */
102 Relation rel; /* relation to copy to or from */
103 QueryDesc *queryDesc; /* executable query to copy from */
104 List *attnumlist; /* integer list of attnums to copy */
105 char *filename; /* filename, or NULL for STDIN/STDOUT */
106 bool binary; /* binary format? */
107 bool oids; /* include OIDs? */
108 bool csv_mode; /* Comma Separated Value format? */
109 bool header_line; /* CSV header line? */
110 char *null_print; /* NULL marker string (server encoding!) */
111 int null_print_len; /* length of same */
112 char *null_print_client; /* same converted to client encoding */
113 char *delim; /* column delimiter (must be 1 byte) */
114 char *quote; /* CSV quote char (must be 1 byte) */
115 char *escape; /* CSV escape char (must be 1 byte) */
116 bool *force_quote_flags; /* per-column CSV FQ flags */
117 bool *force_notnull_flags; /* per-column CSV FNN flags */
119 /* these are just for error messages, see copy_in_error_callback */
120 const char *cur_relname; /* table name for error messages */
121 int cur_lineno; /* line number for error messages */
122 const char *cur_attname; /* current att for error messages */
123 const char *cur_attval; /* current att value for error messages */
126 * Working state for COPY TO
128 FmgrInfo *out_functions; /* lookup info for output functions */
129 MemoryContext rowcontext; /* per-row evaluation context */
132 * These variables are used to reduce overhead in textual COPY FROM.
134 * attribute_buf holds the separated, de-escaped text for each field of
135 * the current line. The CopyReadAttributes functions return arrays of
136 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
137 * the buffer on each cycle.
139 StringInfoData attribute_buf;
142 * Similarly, line_buf holds the whole input line being processed. The
143 * input cycle is first to read the whole line into line_buf, convert it
144 * to server encoding there, and then extract the individual attribute
145 * fields into attribute_buf. line_buf is preserved unmodified so that we
146 * can display it in error messages if appropriate.
148 StringInfoData line_buf;
149 bool line_buf_converted; /* converted to server encoding? */
152 * Finally, raw_buf holds raw data read from the data source (file or
153 * client connection). CopyReadLine parses this data sufficiently to
154 * locate line boundaries, then transfers the data to line_buf and
155 * converts it. Note: we guarantee that there is a \0 at
156 * raw_buf[raw_buf_len].
158 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
160 int raw_buf_index; /* next byte to process */
161 int raw_buf_len; /* total # of bytes stored */
164 typedef CopyStateData *CopyState;
166 /* DestReceiver for COPY (SELECT) TO */
169 DestReceiver pub; /* publicly-known function pointers */
170 CopyState cstate; /* CopyStateData for the command */
175 * These macros centralize code used to process line_buf and raw_buf buffers.
176 * They are macros because they often do continue/break control and to avoid
177 * function call overhead in tight COPY loops.
179 * We must use "if (1)" because "do {} while(0)" overrides the continue/break
180 * processing. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
184 * This keeps the character read at the top of the loop in the buffer
185 * even if there is more than one read-ahead.
187 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
190 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
192 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
199 /* This consumes the remainder of the buffer and breaks */
200 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
203 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
206 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
207 /* backslash just before EOF, treat as data char */ \
215 * Transfer any approved data to line_buf; must do this to be sure
216 * there is some room in raw_buf.
218 #define REFILL_LINEBUF \
221 if (raw_buf_ptr > cstate->raw_buf_index) \
223 appendBinaryStringInfo(&cstate->line_buf, \
224 cstate->raw_buf + cstate->raw_buf_index, \
225 raw_buf_ptr - cstate->raw_buf_index); \
226 cstate->raw_buf_index = raw_buf_ptr; \
230 /* Undo any read-ahead and jump out of the block. */
231 #define NO_END_OF_COPY_GOTO \
234 raw_buf_ptr = prev_raw_ptr + 1; \
235 goto not_end_of_copy; \
239 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
242 /* non-export function prototypes */
243 static void DoCopyTo(CopyState cstate);
244 static void CopyTo(CopyState cstate);
245 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
246 Datum *values, bool *nulls);
247 static void CopyFrom(CopyState cstate);
248 static bool CopyReadLine(CopyState cstate);
249 static bool CopyReadLineText(CopyState cstate);
250 static int CopyReadAttributesText(CopyState cstate, int maxfields,
252 static int CopyReadAttributesCSV(CopyState cstate, int maxfields,
254 static Datum CopyReadBinaryAttribute(CopyState cstate,
255 int column_no, FmgrInfo *flinfo,
256 Oid typioparam, int32 typmod,
258 static void CopyAttributeOutText(CopyState cstate, char *string);
259 static void CopyAttributeOutCSV(CopyState cstate, char *string,
260 bool use_quote, bool single_attr);
261 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
263 static char *limit_printout_length(const char *str);
265 /* Low-level communications functions */
266 static void SendCopyBegin(CopyState cstate);
267 static void ReceiveCopyBegin(CopyState cstate);
268 static void SendCopyEnd(CopyState cstate);
269 static void CopySendData(CopyState cstate, void *databuf, int datasize);
270 static void CopySendString(CopyState cstate, const char *str);
271 static void CopySendChar(CopyState cstate, char c);
272 static void CopySendEndOfRow(CopyState cstate);
273 static int CopyGetData(CopyState cstate, void *databuf,
274 int minread, int maxread);
275 static void CopySendInt32(CopyState cstate, int32 val);
276 static bool CopyGetInt32(CopyState cstate, int32 *val);
277 static void CopySendInt16(CopyState cstate, int16 val);
278 static bool CopyGetInt16(CopyState cstate, int16 *val);
282 * Send copy start/stop messages for frontend copies. These have changed
283 * in past protocol redesigns.
286 SendCopyBegin(CopyState cstate)
288 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
292 int natts = list_length(cstate->attnumlist);
293 int16 format = (cstate->binary ? 1 : 0);
296 pq_beginmessage(&buf, 'H');
297 pq_sendbyte(&buf, format); /* overall format */
298 pq_sendint(&buf, natts, 2);
299 for (i = 0; i < natts; i++)
300 pq_sendint(&buf, format, 2); /* per-column formats */
302 cstate->copy_dest = COPY_NEW_FE;
304 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
309 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
310 errmsg("COPY BINARY is not supported to stdout or from stdin")));
311 pq_putemptymessage('H');
312 /* grottiness needed for old COPY OUT protocol */
314 cstate->copy_dest = COPY_OLD_FE;
321 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
322 errmsg("COPY BINARY is not supported to stdout or from stdin")));
323 pq_putemptymessage('B');
324 /* grottiness needed for old COPY OUT protocol */
326 cstate->copy_dest = COPY_OLD_FE;
331 ReceiveCopyBegin(CopyState cstate)
333 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
337 int natts = list_length(cstate->attnumlist);
338 int16 format = (cstate->binary ? 1 : 0);
341 pq_beginmessage(&buf, 'G');
342 pq_sendbyte(&buf, format); /* overall format */
343 pq_sendint(&buf, natts, 2);
344 for (i = 0; i < natts; i++)
345 pq_sendint(&buf, format, 2); /* per-column formats */
347 cstate->copy_dest = COPY_NEW_FE;
348 cstate->fe_msgbuf = makeStringInfo();
350 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
355 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
356 errmsg("COPY BINARY is not supported to stdout or from stdin")));
357 pq_putemptymessage('G');
358 cstate->copy_dest = COPY_OLD_FE;
365 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
366 errmsg("COPY BINARY is not supported to stdout or from stdin")));
367 pq_putemptymessage('D');
368 cstate->copy_dest = COPY_OLD_FE;
370 /* We *must* flush here to ensure FE knows it can send. */
375 SendCopyEnd(CopyState cstate)
377 if (cstate->copy_dest == COPY_NEW_FE)
379 /* Shouldn't have any unsent data */
380 Assert(cstate->fe_msgbuf->len == 0);
381 /* Send Copy Done message */
382 pq_putemptymessage('c');
386 CopySendData(cstate, "\\.", 2);
387 /* Need to flush out the trailer (this also appends a newline) */
388 CopySendEndOfRow(cstate);
389 pq_endcopyout(false);
394 * CopySendData sends output data to the destination (file or frontend)
395 * CopySendString does the same for null-terminated strings
396 * CopySendChar does the same for single characters
397 * CopySendEndOfRow does the appropriate thing at end of each data row
398 * (data is not actually flushed except by CopySendEndOfRow)
400 * NB: no data conversion is applied by these functions
404 CopySendData(CopyState cstate, void *databuf, int datasize)
406 appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
410 CopySendString(CopyState cstate, const char *str)
412 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
416 CopySendChar(CopyState cstate, char c)
418 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
422 CopySendEndOfRow(CopyState cstate)
424 StringInfo fe_msgbuf = cstate->fe_msgbuf;
426 switch (cstate->copy_dest)
431 /* Default line termination depends on platform */
433 CopySendChar(cstate, '\n');
435 CopySendString(cstate, "\r\n");
439 (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
440 1, cstate->copy_file);
441 if (ferror(cstate->copy_file))
443 (errcode_for_file_access(),
444 errmsg("could not write to COPY file: %m")));
447 /* The FE/BE protocol uses \n as newline for all platforms */
449 CopySendChar(cstate, '\n');
451 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
453 /* no hope of recovering connection sync, so FATAL */
455 (errcode(ERRCODE_CONNECTION_FAILURE),
456 errmsg("connection lost during COPY to stdout")));
460 /* The FE/BE protocol uses \n as newline for all platforms */
462 CopySendChar(cstate, '\n');
464 /* Dump the accumulated row as one CopyData message */
465 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
469 resetStringInfo(fe_msgbuf);
473 * CopyGetData reads data from the source (file or frontend)
475 * We attempt to read at least minread, and at most maxread, bytes from
476 * the source. The actual number of bytes read is returned; if this is
477 * less than minread, EOF was detected.
479 * Note: when copying from the frontend, we expect a proper EOF mark per
480 * protocol; if the frontend simply drops the connection, we raise error.
481 * It seems unwise to allow the COPY IN to complete normally in that case.
483 * NB: no data conversion is applied here.
486 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
490 switch (cstate->copy_dest)
493 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
494 if (ferror(cstate->copy_file))
496 (errcode_for_file_access(),
497 errmsg("could not read from COPY file: %m")));
502 * We cannot read more than minread bytes (which in practice is 1)
503 * because old protocol doesn't have any clear way of separating
504 * the COPY stream from following data. This is slow, but not any
505 * slower than the code path was originally, and we don't care
506 * much anymore about the performance of old protocol.
508 if (pq_getbytes((char *) databuf, minread))
510 /* Only a \. terminator is legal EOF in old protocol */
512 (errcode(ERRCODE_CONNECTION_FAILURE),
513 errmsg("unexpected EOF on client connection")));
518 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
522 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
524 /* Try to receive another message */
528 mtype = pq_getbyte();
531 (errcode(ERRCODE_CONNECTION_FAILURE),
532 errmsg("unexpected EOF on client connection")));
533 if (pq_getmessage(cstate->fe_msgbuf, 0))
535 (errcode(ERRCODE_CONNECTION_FAILURE),
536 errmsg("unexpected EOF on client connection")));
539 case 'd': /* CopyData */
541 case 'c': /* CopyDone */
542 /* COPY IN correctly terminated by frontend */
543 cstate->fe_eof = true;
545 case 'f': /* CopyFail */
547 (errcode(ERRCODE_QUERY_CANCELED),
548 errmsg("COPY from stdin failed: %s",
549 pq_getmsgstring(cstate->fe_msgbuf))));
551 case 'H': /* Flush */
555 * Ignore Flush/Sync for the convenience of client
556 * libraries (such as libpq) that may send those
557 * without noticing that the command they just
563 (errcode(ERRCODE_PROTOCOL_VIOLATION),
564 errmsg("unexpected message type 0x%02X during COPY from stdin",
569 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
572 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
573 databuf = (void *) ((char *) databuf + avail);
585 * These functions do apply some data conversion
589 * CopySendInt32 sends an int32 in network byte order
592 CopySendInt32(CopyState cstate, int32 val)
596 buf = htonl((uint32) val);
597 CopySendData(cstate, &buf, sizeof(buf));
601 * CopyGetInt32 reads an int32 that appears in network byte order
603 * Returns true if OK, false if EOF
606 CopyGetInt32(CopyState cstate, int32 *val)
610 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
612 *val = 0; /* suppress compiler warning */
615 *val = (int32) ntohl(buf);
620 * CopySendInt16 sends an int16 in network byte order
623 CopySendInt16(CopyState cstate, int16 val)
627 buf = htons((uint16) val);
628 CopySendData(cstate, &buf, sizeof(buf));
632 * CopyGetInt16 reads an int16 that appears in network byte order
635 CopyGetInt16(CopyState cstate, int16 *val)
639 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
641 *val = 0; /* suppress compiler warning */
644 *val = (int16) ntohs(buf);
650 * CopyLoadRawBuf loads some more data into raw_buf
652 * Returns TRUE if able to obtain at least one more byte, else FALSE.
654 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
655 * down to the start of the buffer and then we load more data after that.
656 * This case is used only when a frontend multibyte character crosses a
657 * bufferload boundary.
660 CopyLoadRawBuf(CopyState cstate)
665 if (cstate->raw_buf_index < cstate->raw_buf_len)
667 /* Copy down the unprocessed data */
668 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
669 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
673 nbytes = 0; /* no data need be saved */
675 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
676 1, RAW_BUF_SIZE - nbytes);
678 cstate->raw_buf[nbytes] = '\0';
679 cstate->raw_buf_index = 0;
680 cstate->raw_buf_len = nbytes;
681 return (inbytes > 0);
686 * DoCopy executes the SQL COPY statement
688 * Either unload or reload contents of table <relation>, depending on <from>.
689 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
690 * we also support copying the output of an arbitrary SELECT query.
692 * If <pipe> is false, transfer is between the table and the file named
693 * <filename>. Otherwise, transfer is between the table and our regular
694 * input/output stream. The latter could be either stdin/stdout or a
695 * socket, depending on whether we're running under Postmaster control.
697 * Iff <binary>, unload or reload in the binary format, as opposed to the
698 * more wasteful but more robust and portable text format.
700 * Iff <oids>, unload or reload the format that includes OID information.
701 * On input, we accept OIDs whether or not the table has an OID column,
702 * but silently drop them if it does not. On output, we report an error
703 * if the user asks for OIDs in a table that has none (not providing an
704 * OID column might seem friendlier, but could seriously confuse programs).
706 * If in the text format, delimit columns with delimiter <delim> and print
707 * NULL values as <null_print>.
709 * Do not allow a Postgres user without superuser privilege to read from
710 * or write to a file.
712 * Do not allow the copy if user doesn't have proper permission to access
716 DoCopy(const CopyStmt *stmt, const char *queryString)
719 bool is_from = stmt->is_from;
720 bool pipe = (stmt->filename == NULL);
721 List *attnamelist = stmt->attlist;
722 List *force_quote = NIL;
723 List *force_notnull = NIL;
724 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
731 /* Allocate workspace and zero all fields */
732 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
734 /* Extract options from the statement node tree */
735 foreach(option, stmt->options)
737 DefElem *defel = (DefElem *) lfirst(option);
739 if (strcmp(defel->defname, "binary") == 0)
743 (errcode(ERRCODE_SYNTAX_ERROR),
744 errmsg("conflicting or redundant options")));
745 cstate->binary = intVal(defel->arg);
747 else if (strcmp(defel->defname, "oids") == 0)
751 (errcode(ERRCODE_SYNTAX_ERROR),
752 errmsg("conflicting or redundant options")));
753 cstate->oids = intVal(defel->arg);
755 else if (strcmp(defel->defname, "delimiter") == 0)
759 (errcode(ERRCODE_SYNTAX_ERROR),
760 errmsg("conflicting or redundant options")));
761 cstate->delim = strVal(defel->arg);
763 else if (strcmp(defel->defname, "null") == 0)
765 if (cstate->null_print)
767 (errcode(ERRCODE_SYNTAX_ERROR),
768 errmsg("conflicting or redundant options")));
769 cstate->null_print = strVal(defel->arg);
771 else if (strcmp(defel->defname, "csv") == 0)
773 if (cstate->csv_mode)
775 (errcode(ERRCODE_SYNTAX_ERROR),
776 errmsg("conflicting or redundant options")));
777 cstate->csv_mode = intVal(defel->arg);
779 else if (strcmp(defel->defname, "header") == 0)
781 if (cstate->header_line)
783 (errcode(ERRCODE_SYNTAX_ERROR),
784 errmsg("conflicting or redundant options")));
785 cstate->header_line = intVal(defel->arg);
787 else if (strcmp(defel->defname, "quote") == 0)
791 (errcode(ERRCODE_SYNTAX_ERROR),
792 errmsg("conflicting or redundant options")));
793 cstate->quote = strVal(defel->arg);
795 else if (strcmp(defel->defname, "escape") == 0)
799 (errcode(ERRCODE_SYNTAX_ERROR),
800 errmsg("conflicting or redundant options")));
801 cstate->escape = strVal(defel->arg);
803 else if (strcmp(defel->defname, "force_quote") == 0)
807 (errcode(ERRCODE_SYNTAX_ERROR),
808 errmsg("conflicting or redundant options")));
809 force_quote = (List *) defel->arg;
811 else if (strcmp(defel->defname, "force_notnull") == 0)
815 (errcode(ERRCODE_SYNTAX_ERROR),
816 errmsg("conflicting or redundant options")));
817 force_notnull = (List *) defel->arg;
820 elog(ERROR, "option \"%s\" not recognized",
824 /* Check for incompatible options */
825 if (cstate->binary && cstate->delim)
827 (errcode(ERRCODE_SYNTAX_ERROR),
828 errmsg("cannot specify DELIMITER in BINARY mode")));
830 if (cstate->binary && cstate->csv_mode)
832 (errcode(ERRCODE_SYNTAX_ERROR),
833 errmsg("cannot specify CSV in BINARY mode")));
835 if (cstate->binary && cstate->null_print)
837 (errcode(ERRCODE_SYNTAX_ERROR),
838 errmsg("cannot specify NULL in BINARY mode")));
840 /* Set defaults for omitted options */
842 cstate->delim = cstate->csv_mode ? "," : "\t";
844 if (!cstate->null_print)
845 cstate->null_print = cstate->csv_mode ? "" : "\\N";
846 cstate->null_print_len = strlen(cstate->null_print);
848 if (cstate->csv_mode)
851 cstate->quote = "\"";
853 cstate->escape = cstate->quote;
856 /* Only single-character delimiter strings are supported. */
857 if (strlen(cstate->delim) != 1)
859 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
860 errmsg("COPY delimiter must be a single character")));
862 /* Disallow end-of-line characters */
863 if (strchr(cstate->delim, '\r') != NULL ||
864 strchr(cstate->delim, '\n') != NULL)
866 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
867 errmsg("COPY delimiter cannot be newline or carriage return")));
869 if (strchr(cstate->null_print, '\r') != NULL ||
870 strchr(cstate->null_print, '\n') != NULL)
872 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
873 errmsg("COPY null representation cannot use newline or carriage return")));
875 /* Disallow backslash in non-CSV mode */
876 if (!cstate->csv_mode && strchr(cstate->delim, '\\') != NULL)
878 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
879 errmsg("COPY delimiter cannot be backslash")));
882 if (!cstate->csv_mode && cstate->header_line)
884 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
885 errmsg("COPY HEADER available only in CSV mode")));
888 if (!cstate->csv_mode && cstate->quote != NULL)
890 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
891 errmsg("COPY quote available only in CSV mode")));
893 if (cstate->csv_mode && strlen(cstate->quote) != 1)
895 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
896 errmsg("COPY quote must be a single character")));
899 if (!cstate->csv_mode && cstate->escape != NULL)
901 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
902 errmsg("COPY escape available only in CSV mode")));
904 if (cstate->csv_mode && strlen(cstate->escape) != 1)
906 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
907 errmsg("COPY escape must be a single character")));
909 /* Check force_quote */
910 if (!cstate->csv_mode && force_quote != NIL)
912 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
913 errmsg("COPY force quote available only in CSV mode")));
914 if (force_quote != NIL && is_from)
916 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
917 errmsg("COPY force quote only available using COPY TO")));
919 /* Check force_notnull */
920 if (!cstate->csv_mode && force_notnull != NIL)
922 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
923 errmsg("COPY force not null available only in CSV mode")));
924 if (force_notnull != NIL && !is_from)
926 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
927 errmsg("COPY force not null only available using COPY FROM")));
929 /* Don't allow the delimiter to appear in the null string. */
930 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
932 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
933 errmsg("COPY delimiter must not appear in the NULL specification")));
935 /* Don't allow the CSV quote char to appear in the null string. */
936 if (cstate->csv_mode &&
937 strchr(cstate->null_print, cstate->quote[0]) != NULL)
939 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
940 errmsg("CSV quote character must not appear in the NULL specification")));
942 /* Disallow file COPY except to superusers. */
943 if (!pipe && !superuser())
945 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
946 errmsg("must be superuser to COPY to or from a file"),
947 errhint("Anyone can COPY to stdout or from stdin. "
948 "psql's \\copy command also works for anyone.")));
952 Assert(!stmt->query);
953 cstate->queryDesc = NULL;
955 /* Open and lock the relation, using the appropriate lock type. */
956 cstate->rel = heap_openrv(stmt->relation,
957 (is_from ? RowExclusiveLock : AccessShareLock));
959 /* Check relation permissions. */
960 aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel),
963 if (aclresult != ACLCHECK_OK)
964 aclcheck_error(aclresult, ACL_KIND_CLASS,
965 RelationGetRelationName(cstate->rel));
967 /* check read-only transaction */
968 if (XactReadOnly && is_from &&
969 !isTempNamespace(RelationGetNamespace(cstate->rel)))
971 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
972 errmsg("transaction is read-only")));
974 /* Don't allow COPY w/ OIDs to or from a table without them */
975 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
977 (errcode(ERRCODE_UNDEFINED_COLUMN),
978 errmsg("table \"%s\" does not have OIDs",
979 RelationGetRelationName(cstate->rel))));
981 tupDesc = RelationGetDescr(cstate->rel);
993 /* Don't allow COPY w/ OIDs from a select */
996 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
997 errmsg("COPY (SELECT) WITH OIDS is not supported")));
1000 * Run parse analysis and rewrite. Note this also acquires sufficient
1001 * locks on the source table(s).
1003 * Because the parser and planner tend to scribble on their input, we
1004 * make a preliminary copy of the source querytree. This prevents
1005 * problems in the case that the COPY is in a portal or plpgsql
1006 * function and is executed repeatedly. (See also the same hack in
1007 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1009 rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
1010 queryString, NULL, 0);
1012 /* We don't expect more or less than one result query */
1013 if (list_length(rewritten) != 1)
1014 elog(ERROR, "unexpected rewrite result");
1016 query = (Query *) linitial(rewritten);
1017 Assert(query->commandType == CMD_SELECT);
1019 /* Query mustn't use INTO, either */
1022 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1023 errmsg("COPY (SELECT INTO) is not supported")));
1025 /* plan the query */
1026 plan = planner(query, 0, NULL);
1029 * Update snapshot command ID to ensure this query sees results of any
1030 * previously executed queries. (It's a bit cheesy to modify
1031 * ActiveSnapshot without making a copy, but for the limited ways in
1032 * which COPY can be invoked, I think it's OK, because the active
1033 * snapshot shouldn't be shared with anything else anyway.)
1035 ActiveSnapshot->curcid = GetCurrentCommandId();
1037 /* Create dest receiver for COPY OUT */
1038 dest = CreateDestReceiver(DestCopyOut, NULL);
1039 ((DR_copy *) dest)->cstate = cstate;
1041 /* Create a QueryDesc requesting no output */
1042 cstate->queryDesc = CreateQueryDesc(plan,
1043 ActiveSnapshot, InvalidSnapshot,
1047 * Call ExecutorStart to prepare the plan for execution.
1049 * ExecutorStart computes a result tupdesc for us
1051 ExecutorStart(cstate->queryDesc, 0);
1053 tupDesc = cstate->queryDesc->tupDesc;
1056 /* Generate or convert list of attributes to process */
1057 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1059 num_phys_attrs = tupDesc->natts;
1061 /* Convert FORCE QUOTE name list to per-column flags, check validity */
1062 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1068 attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
1070 foreach(cur, attnums)
1072 int attnum = lfirst_int(cur);
1074 if (!list_member_int(cstate->attnumlist, attnum))
1076 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1077 errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
1078 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1079 cstate->force_quote_flags[attnum - 1] = true;
1083 /* Convert FORCE NOT NULL name list to per-column flags, check validity */
1084 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1090 attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
1092 foreach(cur, attnums)
1094 int attnum = lfirst_int(cur);
1096 if (!list_member_int(cstate->attnumlist, attnum))
1098 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1099 errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
1100 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1101 cstate->force_notnull_flags[attnum - 1] = true;
1105 /* Set up variables to avoid per-attribute overhead. */
1106 initStringInfo(&cstate->attribute_buf);
1107 initStringInfo(&cstate->line_buf);
1108 cstate->line_buf_converted = false;
1109 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
1110 cstate->raw_buf_index = cstate->raw_buf_len = 0;
1111 cstate->processed = 0;
1114 * Set up encoding conversion info. Even if the client and server
1115 * encodings are the same, we must apply pg_client_to_server() to validate
1116 * data in multibyte encodings.
1118 cstate->client_encoding = pg_get_client_encoding();
1119 cstate->need_transcoding =
1120 (cstate->client_encoding != GetDatabaseEncoding() ||
1121 pg_database_encoding_max_length() > 1);
1122 /* See Multibyte encoding comment above */
1123 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
1125 cstate->copy_dest = COPY_FILE; /* default */
1126 cstate->filename = stmt->filename;
1129 CopyFrom(cstate); /* copy from file to database */
1131 DoCopyTo(cstate); /* copy from database to file */
1134 * Close the relation or query. If reading, we can release the
1135 * AccessShareLock we got; if writing, we should hold the lock until end
1136 * of transaction to ensure that updates will be committed before lock is
1140 heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
1143 /* Close down the query and free resources. */
1144 ExecutorEnd(cstate->queryDesc);
1145 FreeQueryDesc(cstate->queryDesc);
1148 /* Clean up storage (probably not really necessary) */
1149 processed = cstate->processed;
1151 pfree(cstate->attribute_buf.data);
1152 pfree(cstate->line_buf.data);
1153 pfree(cstate->raw_buf);
1161 * This intermediate routine exists mainly to localize the effects of setjmp
1162 * so we don't need to plaster a lot of variables with "volatile".
1165 DoCopyTo(CopyState cstate)
1167 bool pipe = (cstate->filename == NULL);
1171 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1173 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1175 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1176 errmsg("cannot copy from view \"%s\"",
1177 RelationGetRelationName(cstate->rel)),
1178 errhint("Try the COPY (SELECT ...) TO variant.")));
1179 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1181 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1182 errmsg("cannot copy from sequence \"%s\"",
1183 RelationGetRelationName(cstate->rel))));
1186 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1187 errmsg("cannot copy from non-table relation \"%s\"",
1188 RelationGetRelationName(cstate->rel))));
1194 if (whereToSendOutput == DestRemote)
1195 cstate->fe_copy = true;
1197 cstate->copy_file = stdout;
1201 mode_t oumask; /* Pre-existing umask value */
1205 * Prevent write to relative path ... too easy to shoot oneself in the
1206 * foot by overwriting a database file ...
1208 if (!is_absolute_path(cstate->filename))
1210 (errcode(ERRCODE_INVALID_NAME),
1211 errmsg("relative path not allowed for COPY to file")));
1213 oumask = umask((mode_t) 022);
1214 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1217 if (cstate->copy_file == NULL)
1219 (errcode_for_file_access(),
1220 errmsg("could not open file \"%s\" for writing: %m",
1221 cstate->filename)));
1223 fstat(fileno(cstate->copy_file), &st);
1224 if (S_ISDIR(st.st_mode))
1226 FreeFile(cstate->copy_file);
1228 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1229 errmsg("\"%s\" is a directory", cstate->filename)));
1235 if (cstate->fe_copy)
1236 SendCopyBegin(cstate);
1240 if (cstate->fe_copy)
1241 SendCopyEnd(cstate);
1246 * Make sure we turn off old-style COPY OUT mode upon error. It is
1247 * okay to do this in all cases, since it does nothing if the mode is
1250 pq_endcopyout(true);
1257 if (FreeFile(cstate->copy_file))
1259 (errcode_for_file_access(),
1260 errmsg("could not write to file \"%s\": %m",
1261 cstate->filename)));
1266 * Copy from relation or query TO file.
1269 CopyTo(CopyState cstate)
1273 Form_pg_attribute *attr;
1277 tupDesc = RelationGetDescr(cstate->rel);
1279 tupDesc = cstate->queryDesc->tupDesc;
1280 attr = tupDesc->attrs;
1281 num_phys_attrs = tupDesc->natts;
1282 cstate->null_print_client = cstate->null_print; /* default */
1284 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1285 cstate->fe_msgbuf = makeStringInfo();
1287 /* Get info about the columns we need to process. */
1288 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1289 foreach(cur, cstate->attnumlist)
1291 int attnum = lfirst_int(cur);
1296 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1300 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1303 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1307 * Create a temporary memory context that we can reset once per row to
1308 * recover palloc'd memory. This avoids any problems with leaks inside
1309 * datatype output routines, and should be faster than retail pfree's
1310 * anyway. (We don't need a whole econtext as CopyFrom does.)
1312 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1314 ALLOCSET_DEFAULT_MINSIZE,
1315 ALLOCSET_DEFAULT_INITSIZE,
1316 ALLOCSET_DEFAULT_MAXSIZE);
1320 /* Generate header for a binary copy */
1324 CopySendData(cstate, (char *) BinarySignature, 11);
1329 CopySendInt32(cstate, tmp);
1330 /* No header extension */
1332 CopySendInt32(cstate, tmp);
1337 * For non-binary copy, we need to convert null_print to client
1338 * encoding, because it will be sent directly with CopySendString.
1340 if (cstate->need_transcoding)
1341 cstate->null_print_client = pg_server_to_client(cstate->null_print,
1342 cstate->null_print_len);
1344 /* if a header has been requested send the line */
1345 if (cstate->header_line)
1347 bool hdr_delim = false;
1349 foreach(cur, cstate->attnumlist)
1351 int attnum = lfirst_int(cur);
1355 CopySendChar(cstate, cstate->delim[0]);
1358 colname = NameStr(attr[attnum - 1]->attname);
1360 CopyAttributeOutCSV(cstate, colname, false,
1361 list_length(cstate->attnumlist) == 1);
1364 CopySendEndOfRow(cstate);
1372 HeapScanDesc scandesc;
1375 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1376 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1378 scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL);
1380 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
1382 CHECK_FOR_INTERRUPTS();
1384 /* Deconstruct the tuple ... faster than repeated heap_getattr */
1385 heap_deform_tuple(tuple, tupDesc, values, nulls);
1387 /* Format and send the data */
1388 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
1391 heap_endscan(scandesc);
1395 /* run the plan --- the dest receiver will send tuples */
1396 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
1401 /* Generate trailer for a binary copy */
1402 CopySendInt16(cstate, -1);
1403 /* Need to flush out the trailer */
1404 CopySendEndOfRow(cstate);
1407 MemoryContextDelete(cstate->rowcontext);
1411 * Emit one row during CopyTo().
1414 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
1416 bool need_delim = false;
1417 FmgrInfo *out_functions = cstate->out_functions;
1418 MemoryContext oldcontext;
1422 MemoryContextReset(cstate->rowcontext);
1423 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1427 /* Binary per-tuple header */
1428 CopySendInt16(cstate, list_length(cstate->attnumlist));
1429 /* Send OID if wanted --- note attnumlist doesn't include it */
1432 /* Hack --- assume Oid is same size as int32 */
1433 CopySendInt32(cstate, sizeof(int32));
1434 CopySendInt32(cstate, tupleOid);
1439 /* Text format has no per-tuple header, but send OID if wanted */
1440 /* Assume digits don't need any quoting or encoding conversion */
1443 string = DatumGetCString(DirectFunctionCall1(oidout,
1444 ObjectIdGetDatum(tupleOid)));
1445 CopySendString(cstate, string);
1450 foreach(cur, cstate->attnumlist)
1452 int attnum = lfirst_int(cur);
1453 Datum value = values[attnum - 1];
1454 bool isnull = nulls[attnum - 1];
1456 if (!cstate->binary)
1459 CopySendChar(cstate, cstate->delim[0]);
1465 if (!cstate->binary)
1466 CopySendString(cstate, cstate->null_print_client);
1468 CopySendInt32(cstate, -1);
1472 if (!cstate->binary)
1474 string = OutputFunctionCall(&out_functions[attnum - 1],
1476 if (cstate->csv_mode)
1477 CopyAttributeOutCSV(cstate, string,
1478 cstate->force_quote_flags[attnum - 1],
1479 list_length(cstate->attnumlist) == 1);
1481 CopyAttributeOutText(cstate, string);
1487 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
1489 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
1490 CopySendData(cstate, VARDATA(outputbytes),
1491 VARSIZE(outputbytes) - VARHDRSZ);
1496 CopySendEndOfRow(cstate);
1498 MemoryContextSwitchTo(oldcontext);
1500 cstate->processed++;
1505 * error context callback for COPY FROM
1508 copy_in_error_callback(void *arg)
1510 CopyState cstate = (CopyState) arg;
1514 /* can't usefully display the data */
1515 if (cstate->cur_attname)
1516 errcontext("COPY %s, line %d, column %s",
1517 cstate->cur_relname, cstate->cur_lineno,
1518 cstate->cur_attname);
1520 errcontext("COPY %s, line %d",
1521 cstate->cur_relname, cstate->cur_lineno);
1525 if (cstate->cur_attname && cstate->cur_attval)
1527 /* error is relevant to a particular column */
1530 attval = limit_printout_length(cstate->cur_attval);
1531 errcontext("COPY %s, line %d, column %s: \"%s\"",
1532 cstate->cur_relname, cstate->cur_lineno,
1533 cstate->cur_attname, attval);
1536 else if (cstate->cur_attname)
1538 /* error is relevant to a particular column, value is NULL */
1539 errcontext("COPY %s, line %d, column %s: null input",
1540 cstate->cur_relname, cstate->cur_lineno,
1541 cstate->cur_attname);
1545 /* error is relevant to a particular line */
1546 if (cstate->line_buf_converted || !cstate->need_transcoding)
1550 lineval = limit_printout_length(cstate->line_buf.data);
1551 errcontext("COPY %s, line %d: \"%s\"",
1552 cstate->cur_relname, cstate->cur_lineno, lineval);
1558 * Here, the line buffer is still in a foreign encoding, and
1559 * indeed it's quite likely that the error is precisely a
1560 * failure to do encoding conversion (ie, bad data). We dare
1561 * not try to convert it, and at present there's no way to
1562 * regurgitate it without conversion. So we have to punt and
1563 * just report the line number.
1565 errcontext("COPY %s, line %d",
1566 cstate->cur_relname, cstate->cur_lineno);
1573 * Make sure we don't print an unreasonable amount of COPY data in a message.
1575 * It would seem a lot easier to just use the sprintf "precision" limit to
1576 * truncate the string. However, some versions of glibc have a bug/misfeature
1577 * that vsnprintf will always fail (return -1) if it is asked to truncate
1578 * a string that contains invalid byte sequences for the current encoding.
1579 * So, do our own truncation. We return a pstrdup'd copy of the input.
1582 limit_printout_length(const char *str)
1584 #define MAX_COPY_DATA_DISPLAY 100
1586 int slen = strlen(str);
1590 /* Fast path if definitely okay */
1591 if (slen <= MAX_COPY_DATA_DISPLAY)
1592 return pstrdup(str);
1594 /* Apply encoding-dependent truncation */
1595 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
1598 * Truncate, and add "..." to show we truncated the input.
1600 res = (char *) palloc(len + 4);
1601 memcpy(res, str, len);
1602 strcpy(res + len, "...");
1608 * Copy FROM file to relation.
1611 CopyFrom(CopyState cstate)
1613 bool pipe = (cstate->filename == NULL);
1616 Form_pg_attribute *attr;
1617 AttrNumber num_phys_attrs,
1620 FmgrInfo *in_functions;
1621 FmgrInfo oid_in_function;
1630 char **field_strings;
1633 ResultRelInfo *resultRelInfo;
1634 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
1635 TupleTableSlot *slot;
1638 ExprState **defexprs; /* array of default att expressions */
1639 ExprContext *econtext; /* used for ExecEvalExpr for default atts */
1640 MemoryContext oldcontext = CurrentMemoryContext;
1641 ErrorContextCallback errcontext;
1642 CommandId mycid = GetCurrentCommandId();
1643 bool use_wal = true; /* by default, use WAL logging */
1644 bool use_fsm = true; /* by default, use FSM for free space */
1646 Assert(cstate->rel);
1648 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1650 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1652 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1653 errmsg("cannot copy to view \"%s\"",
1654 RelationGetRelationName(cstate->rel))));
1655 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1657 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1658 errmsg("cannot copy to sequence \"%s\"",
1659 RelationGetRelationName(cstate->rel))));
1662 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1663 errmsg("cannot copy to non-table relation \"%s\"",
1664 RelationGetRelationName(cstate->rel))));
1668 * Check to see if we can avoid writing WAL
1670 * If archive logging is not enabled *and* either
1671 * - table was created in same transaction as this COPY
1672 * - data is being written to relfilenode created in this transaction
1673 * then we can skip writing WAL. It's safe because if the transaction
1674 * doesn't commit, we'll discard the table (or the new relfilenode file).
1675 * If it does commit, we'll have done the heap_sync at the bottom of this
1678 * As mentioned in comments in utils/rel.h, the in-same-transaction test
1679 * is not completely reliable, since in rare cases rd_createSubid or
1680 * rd_newRelfilenodeSubid can be cleared before the end of the transaction.
1681 * However this is OK since at worst we will fail to make the optimization.
1683 * When skipping WAL it's entirely possible that COPY itself will write no
1684 * WAL records at all. This is of concern because RecordTransactionCommit
1685 * might decide it doesn't need to log our eventual commit, which we
1686 * certainly need it to do. However, we need no special action here for
1687 * that, because if we have a new table or new relfilenode then there
1688 * must have been a WAL-logged pg_class update earlier in the transaction.
1690 * Also, if the target file is new-in-transaction, we assume that checking
1691 * FSM for free space is a waste of time, even if we must use WAL because
1692 * of archiving. This could possibly be wrong, but it's unlikely.
1694 * The comments for heap_insert and RelationGetBufferForTuple specify that
1695 * skipping WAL logging is only safe if we ensure that our tuples do not
1696 * go into pages containing tuples from any other transactions --- but this
1697 * must be the case if we have a new table or new relfilenode, so we need
1698 * no additional work to enforce that.
1701 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
1702 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1705 if (!XLogArchivingActive())
1711 if (whereToSendOutput == DestRemote)
1712 ReceiveCopyBegin(cstate);
1714 cstate->copy_file = stdin;
1720 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1722 if (cstate->copy_file == NULL)
1724 (errcode_for_file_access(),
1725 errmsg("could not open file \"%s\" for reading: %m",
1726 cstate->filename)));
1728 fstat(fileno(cstate->copy_file), &st);
1729 if (S_ISDIR(st.st_mode))
1731 FreeFile(cstate->copy_file);
1733 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1734 errmsg("\"%s\" is a directory", cstate->filename)));
1738 tupDesc = RelationGetDescr(cstate->rel);
1739 attr = tupDesc->attrs;
1740 num_phys_attrs = tupDesc->natts;
1741 attr_count = list_length(cstate->attnumlist);
1745 * We need a ResultRelInfo so we can use the regular executor's
1746 * index-entry-making machinery. (There used to be a huge amount of code
1747 * here that basically duplicated execUtils.c ...)
1749 resultRelInfo = makeNode(ResultRelInfo);
1750 resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
1751 resultRelInfo->ri_RelationDesc = cstate->rel;
1752 resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
1753 if (resultRelInfo->ri_TrigDesc)
1754 resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
1755 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
1756 resultRelInfo->ri_TrigInstrument = NULL;
1758 ExecOpenIndices(resultRelInfo);
1760 estate->es_result_relations = resultRelInfo;
1761 estate->es_num_result_relations = 1;
1762 estate->es_result_relation_info = resultRelInfo;
1764 /* Set up a tuple slot too */
1765 slot = MakeSingleTupleTableSlot(tupDesc);
1767 econtext = GetPerTupleExprContext(estate);
1770 * Pick up the required catalog information for each attribute in the
1771 * relation, including the input function, the element type (to pass to
1772 * the input function), and info about defaults and constraints. (Which
1773 * input function we use depends on text/binary format choice.)
1775 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1776 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1777 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1778 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1780 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1782 /* We don't need info for dropped attributes */
1783 if (attr[attnum - 1]->attisdropped)
1786 /* Fetch the input function and typioparam info */
1788 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
1789 &in_func_oid, &typioparams[attnum - 1]);
1791 getTypeInputInfo(attr[attnum - 1]->atttypid,
1792 &in_func_oid, &typioparams[attnum - 1]);
1793 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1795 /* Get default info if needed */
1796 if (!list_member_int(cstate->attnumlist, attnum))
1798 /* attribute is NOT to be copied from input */
1799 /* use default value if one exists */
1800 Node *defexpr = build_column_default(cstate->rel, attnum);
1802 if (defexpr != NULL)
1804 defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
1806 defmap[num_defaults] = attnum - 1;
1812 /* Prepare to catch AFTER triggers. */
1813 AfterTriggerBeginQuery();
1816 * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
1817 * should do this for COPY, since it's not really an "INSERT" statement as
1818 * such. However, executing these triggers maintains consistency with the
1819 * EACH ROW triggers that we already fire on COPY.
1821 ExecBSInsertTriggers(estate, resultRelInfo);
1823 if (!cstate->binary)
1824 file_has_oids = cstate->oids; /* must rely on user to tell us... */
1827 /* Read and verify binary header */
1832 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
1833 memcmp(readSig, BinarySignature, 11) != 0)
1835 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1836 errmsg("COPY file signature not recognized")));
1838 if (!CopyGetInt32(cstate, &tmp))
1840 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1841 errmsg("invalid COPY file header (missing flags)")));
1842 file_has_oids = (tmp & (1 << 16)) != 0;
1844 if ((tmp >> 16) != 0)
1846 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1847 errmsg("unrecognized critical flags in COPY file header")));
1848 /* Header extension length */
1849 if (!CopyGetInt32(cstate, &tmp) ||
1852 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1853 errmsg("invalid COPY file header (missing length)")));
1854 /* Skip extension header, if present */
1857 if (CopyGetData(cstate, readSig, 1, 1) != 1)
1859 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1860 errmsg("invalid COPY file header (wrong length)")));
1864 if (file_has_oids && cstate->binary)
1866 getTypeBinaryInputInfo(OIDOID,
1867 &in_func_oid, &oid_typioparam);
1868 fmgr_info(in_func_oid, &oid_in_function);
1871 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1872 nulls = (char *) palloc(num_phys_attrs * sizeof(char));
1874 /* create workspace for CopyReadAttributes results */
1875 nfields = file_has_oids ? (attr_count + 1) : attr_count;
1876 field_strings = (char **) palloc(nfields * sizeof(char *));
1878 /* Initialize state variables */
1879 cstate->fe_eof = false;
1880 cstate->eol_type = EOL_UNKNOWN;
1881 cstate->cur_relname = RelationGetRelationName(cstate->rel);
1882 cstate->cur_lineno = 0;
1883 cstate->cur_attname = NULL;
1884 cstate->cur_attval = NULL;
1886 /* Set up callback to identify error line number */
1887 errcontext.callback = copy_in_error_callback;
1888 errcontext.arg = (void *) cstate;
1889 errcontext.previous = error_context_stack;
1890 error_context_stack = &errcontext;
1892 /* on input just throw the header line away */
1893 if (cstate->header_line)
1895 cstate->cur_lineno++;
1896 done = CopyReadLine(cstate);
1902 Oid loaded_oid = InvalidOid;
1904 CHECK_FOR_INTERRUPTS();
1906 cstate->cur_lineno++;
1908 /* Reset the per-tuple exprcontext */
1909 ResetPerTupleExprContext(estate);
1911 /* Switch into its memory context */
1912 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1914 /* Initialize all values for row to NULL */
1915 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
1916 MemSet(nulls, 'n', num_phys_attrs * sizeof(char));
1918 if (!cstate->binary)
1925 /* Actually read the line into memory here */
1926 done = CopyReadLine(cstate);
1929 * EOF at start of line means we're done. If we see EOF after
1930 * some characters, we act as though it was newline followed by
1931 * EOF, ie, process the line and then exit loop on next iteration.
1933 if (done && cstate->line_buf.len == 0)
1936 /* Parse the line into de-escaped field values */
1937 if (cstate->csv_mode)
1938 fldct = CopyReadAttributesCSV(cstate, nfields, field_strings);
1940 fldct = CopyReadAttributesText(cstate, nfields, field_strings);
1943 /* Read the OID field if present */
1946 if (fieldno >= fldct)
1948 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1949 errmsg("missing data for OID column")));
1950 string = field_strings[fieldno++];
1954 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1955 errmsg("null OID in COPY data")));
1958 cstate->cur_attname = "oid";
1959 cstate->cur_attval = string;
1960 loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
1961 CStringGetDatum(string)));
1962 if (loaded_oid == InvalidOid)
1964 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1965 errmsg("invalid OID in COPY data")));
1966 cstate->cur_attname = NULL;
1967 cstate->cur_attval = NULL;
1971 /* Loop to read the user attributes on the line. */
1972 foreach(cur, cstate->attnumlist)
1974 int attnum = lfirst_int(cur);
1977 if (fieldno >= fldct)
1979 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1980 errmsg("missing data for column \"%s\"",
1981 NameStr(attr[m]->attname))));
1982 string = field_strings[fieldno++];
1984 if (cstate->csv_mode && string == NULL &&
1985 cstate->force_notnull_flags[m])
1987 /* Go ahead and read the NULL string */
1988 string = cstate->null_print;
1991 cstate->cur_attname = NameStr(attr[m]->attname);
1992 cstate->cur_attval = string;
1993 values[m] = InputFunctionCall(&in_functions[m],
1996 attr[m]->atttypmod);
1999 cstate->cur_attname = NULL;
2000 cstate->cur_attval = NULL;
2003 Assert(fieldno == nfields);
2011 if (!CopyGetInt16(cstate, &fld_count) ||
2018 if (fld_count != attr_count)
2020 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2021 errmsg("row field count is %d, expected %d",
2022 (int) fld_count, attr_count)));
2026 cstate->cur_attname = "oid";
2028 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
2034 if (isnull || loaded_oid == InvalidOid)
2036 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2037 errmsg("invalid OID in COPY data")));
2038 cstate->cur_attname = NULL;
2042 foreach(cur, cstate->attnumlist)
2044 int attnum = lfirst_int(cur);
2047 cstate->cur_attname = NameStr(attr[m]->attname);
2049 values[m] = CopyReadBinaryAttribute(cstate,
2055 nulls[m] = isnull ? 'n' : ' ';
2056 cstate->cur_attname = NULL;
2061 * Now compute and insert any defaults available for the columns not
2062 * provided by the input data. Anything not processed here or above
2065 for (i = 0; i < num_defaults; i++)
2067 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
2070 nulls[defmap[i]] = ' ';
2073 /* And now we can form the input tuple. */
2074 tuple = heap_formtuple(tupDesc, values, nulls);
2076 if (cstate->oids && file_has_oids)
2077 HeapTupleSetOid(tuple, loaded_oid);
2079 /* Triggers and stuff need to be invoked in query context. */
2080 MemoryContextSwitchTo(oldcontext);
2084 /* BEFORE ROW INSERT Triggers */
2085 if (resultRelInfo->ri_TrigDesc &&
2086 resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
2090 newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
2092 if (newtuple == NULL) /* "do nothing" */
2094 else if (newtuple != tuple) /* modified by Trigger(s) */
2096 heap_freetuple(tuple);
2103 /* Place tuple in tuple slot */
2104 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2106 /* Check the constraints of the tuple */
2107 if (cstate->rel->rd_att->constr)
2108 ExecConstraints(resultRelInfo, slot, estate);
2110 /* OK, store the tuple and create index entries for it */
2111 heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm);
2113 if (resultRelInfo->ri_NumIndices > 0)
2114 ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
2116 /* AFTER ROW INSERT Triggers */
2117 ExecARInsertTriggers(estate, resultRelInfo, tuple);
2120 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2121 * this is the same definition used by execMain.c for counting
2122 * tuples inserted by an INSERT command.
2124 cstate->processed++;
2128 /* Done, clean up */
2129 error_context_stack = errcontext.previous;
2131 MemoryContextSwitchTo(oldcontext);
2133 /* Execute AFTER STATEMENT insertion triggers */
2134 ExecASInsertTriggers(estate, resultRelInfo);
2136 /* Handle queued AFTER triggers */
2137 AfterTriggerEndQuery(estate);
2141 pfree(field_strings);
2143 pfree(in_functions);
2148 ExecDropSingleTupleTableSlot(slot);
2150 ExecCloseIndices(resultRelInfo);
2152 FreeExecutorState(estate);
2156 if (FreeFile(cstate->copy_file))
2158 (errcode_for_file_access(),
2159 errmsg("could not read from file \"%s\": %m",
2160 cstate->filename)));
2164 * If we skipped writing WAL, then we need to sync the heap (but not
2165 * indexes since those use WAL anyway)
2168 heap_sync(cstate->rel);
2173 * Read the next input line and stash it in line_buf, with conversion to
2176 * Result is true if read was terminated by EOF, false if terminated
2177 * by newline. The terminating newline or EOF marker is not included
2178 * in the final value of line_buf.
2181 CopyReadLine(CopyState cstate)
2185 resetStringInfo(&cstate->line_buf);
2187 /* Mark that encoding conversion hasn't occurred yet */
2188 cstate->line_buf_converted = false;
2190 /* Parse data and transfer into line_buf */
2191 result = CopyReadLineText(cstate);
2196 * Reached EOF. In protocol version 3, we should ignore anything
2197 * after \. up to the protocol end of copy data. (XXX maybe better
2198 * not to treat \. as special?)
2200 if (cstate->copy_dest == COPY_NEW_FE)
2204 cstate->raw_buf_index = cstate->raw_buf_len;
2205 } while (CopyLoadRawBuf(cstate));
2211 * If we didn't hit EOF, then we must have transferred the EOL marker
2212 * to line_buf along with the data. Get rid of it.
2214 switch (cstate->eol_type)
2217 Assert(cstate->line_buf.len >= 1);
2218 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2219 cstate->line_buf.len--;
2220 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2223 Assert(cstate->line_buf.len >= 1);
2224 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
2225 cstate->line_buf.len--;
2226 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2229 Assert(cstate->line_buf.len >= 2);
2230 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
2231 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2232 cstate->line_buf.len -= 2;
2233 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2236 /* shouldn't get here */
2242 /* Done reading the line. Convert it to server encoding. */
2243 if (cstate->need_transcoding)
2247 cvt = pg_client_to_server(cstate->line_buf.data,
2248 cstate->line_buf.len);
2249 if (cvt != cstate->line_buf.data)
2251 /* transfer converted data back to line_buf */
2252 resetStringInfo(&cstate->line_buf);
2253 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
2258 /* Now it's safe to use the buffer in error messages */
2259 cstate->line_buf_converted = true;
2265 * CopyReadLineText - inner loop of CopyReadLine for text mode
2268 CopyReadLineText(CopyState cstate)
2273 bool need_data = false;
2274 bool hit_eof = false;
2275 bool result = false;
2279 bool first_char_in_line = true;
2280 bool in_quote = false,
2281 last_was_esc = false;
2283 char escapec = '\0';
2285 if (cstate->csv_mode)
2287 quotec = cstate->quote[0];
2288 escapec = cstate->escape[0];
2289 /* ignore special escape processing if it's the same as quotec */
2290 if (quotec == escapec)
2294 mblen_str[1] = '\0';
2297 * The objective of this loop is to transfer the entire next input line
2298 * into line_buf. Hence, we only care for detecting newlines (\r and/or
2299 * \n) and the end-of-copy marker (\.).
2301 * In CSV mode, \r and \n inside a quoted field are just part of the data
2302 * value and are put in line_buf. We keep just enough state to know if we
2303 * are currently in a quoted field or not.
2305 * These four characters, and the CSV escape and quote characters, are
2306 * assumed the same in frontend and backend encodings.
2308 * For speed, we try to move data from raw_buf to line_buf in chunks
2309 * rather than one character at a time. raw_buf_ptr points to the next
2310 * character to examine; any characters from raw_buf_index to raw_buf_ptr
2311 * have been determined to be part of the line, but not yet transferred to
2314 * For a little extra speed within the loop, we copy raw_buf and
2315 * raw_buf_len into local variables.
2317 copy_raw_buf = cstate->raw_buf;
2318 raw_buf_ptr = cstate->raw_buf_index;
2319 copy_buf_len = cstate->raw_buf_len;
2327 * Load more data if needed. Ideally we would just force four bytes
2328 * of read-ahead and avoid the many calls to
2329 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
2330 * does not allow us to read too far ahead or we might read into the
2331 * next data, so we read-ahead only as far we know we can. One
2332 * optimization would be to read-ahead four byte here if
2333 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
2334 * considering the size of the buffer.
2336 if (raw_buf_ptr >= copy_buf_len || need_data)
2341 * Try to read some more data. This will certainly reset
2342 * raw_buf_index to zero, and raw_buf_ptr must go with it.
2344 if (!CopyLoadRawBuf(cstate))
2347 copy_buf_len = cstate->raw_buf_len;
2350 * If we are completely out of data, break out of the loop,
2353 if (copy_buf_len <= 0)
2361 /* OK to fetch a character */
2362 prev_raw_ptr = raw_buf_ptr;
2363 c = copy_raw_buf[raw_buf_ptr++];
2365 if (cstate->csv_mode)
2368 * If character is '\\' or '\r', we may need to look ahead below.
2369 * Force fetch of the next character if we don't already have it.
2370 * We need to do this before changing CSV state, in case one of
2371 * these characters is also the quote or escape character.
2373 * Note: old-protocol does not like forced prefetch, but it's OK
2374 * here since we cannot validly be at EOF.
2376 if (c == '\\' || c == '\r')
2378 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2382 * Dealing with quotes and escapes here is mildly tricky. If the
2383 * quote char is also the escape char, there's no problem - we
2384 * just use the char as a toggle. If they are different, we need
2385 * to ensure that we only take account of an escape inside a
2386 * quoted field and immediately preceding a quote char, and not
2387 * the second in a escape-escape sequence.
2389 if (in_quote && c == escapec)
2390 last_was_esc = !last_was_esc;
2391 if (c == quotec && !last_was_esc)
2392 in_quote = !in_quote;
2394 last_was_esc = false;
2397 * Updating the line count for embedded CR and/or LF chars is
2398 * necessarily a little fragile - this test is probably about the
2399 * best we can do. (XXX it's arguable whether we should do this
2400 * at all --- is cur_lineno a physical or logical count?)
2402 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
2403 cstate->cur_lineno++;
2407 if (c == '\r' && (!cstate->csv_mode || !in_quote))
2409 /* Check for \r\n on first line, _and_ handle \r\n. */
2410 if (cstate->eol_type == EOL_UNKNOWN ||
2411 cstate->eol_type == EOL_CRNL)
2414 * If need more data, go back to loop top to load it.
2416 * Note that if we are at EOF, c will wind up as '\0' because
2417 * of the guaranteed pad of raw_buf.
2419 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2422 c = copy_raw_buf[raw_buf_ptr];
2426 raw_buf_ptr++; /* eat newline */
2427 cstate->eol_type = EOL_CRNL; /* in case not set yet */
2431 /* found \r, but no \n */
2432 if (cstate->eol_type == EOL_CRNL)
2434 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2435 errmsg(!cstate->csv_mode ?
2436 "literal carriage return found in data" :
2437 "unquoted carriage return found in data"),
2438 errhint(!cstate->csv_mode ?
2439 "Use \"\\r\" to represent carriage return." :
2440 "Use quoted CSV field to represent carriage return.")));
2443 * if we got here, it is the first line and we didn't find
2444 * \n, so don't consume the peeked character
2446 cstate->eol_type = EOL_CR;
2449 else if (cstate->eol_type == EOL_NL)
2451 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2452 errmsg(!cstate->csv_mode ?
2453 "literal carriage return found in data" :
2454 "unquoted carriage return found in data"),
2455 errhint(!cstate->csv_mode ?
2456 "Use \"\\r\" to represent carriage return." :
2457 "Use quoted CSV field to represent carriage return.")));
2458 /* If reach here, we have found the line terminator */
2463 if (c == '\n' && (!cstate->csv_mode || !in_quote))
2465 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
2467 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2468 errmsg(!cstate->csv_mode ?
2469 "literal newline found in data" :
2470 "unquoted newline found in data"),
2471 errhint(!cstate->csv_mode ?
2472 "Use \"\\n\" to represent newline." :
2473 "Use quoted CSV field to represent newline.")));
2474 cstate->eol_type = EOL_NL; /* in case not set yet */
2475 /* If reach here, we have found the line terminator */
2480 * In CSV mode, we only recognize \. alone on a line. This is because
2481 * \. is a valid CSV data value.
2483 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
2487 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2488 IF_NEED_REFILL_AND_EOF_BREAK(0);
2491 * get next character
2492 * Note: we do not change c so if it isn't \., we can fall
2493 * through and continue processing for client encoding.
2496 c2 = copy_raw_buf[raw_buf_ptr];
2500 raw_buf_ptr++; /* consume the '.' */
2503 * Note: if we loop back for more data here, it does not
2504 * matter that the CSV state change checks are re-executed; we
2505 * will come back here with no important state changed.
2507 if (cstate->eol_type == EOL_CRNL)
2509 /* Get the next character */
2510 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2511 /* if hit_eof, c2 will become '\0' */
2512 c2 = copy_raw_buf[raw_buf_ptr++];
2516 if (!cstate->csv_mode)
2518 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2519 errmsg("end-of-copy marker does not match previous newline style")));
2521 NO_END_OF_COPY_GOTO;
2523 else if (c2 != '\r')
2525 if (!cstate->csv_mode)
2527 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2528 errmsg("end-of-copy marker corrupt")));
2530 NO_END_OF_COPY_GOTO;
2534 /* Get the next character */
2535 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2536 /* if hit_eof, c2 will become '\0' */
2537 c2 = copy_raw_buf[raw_buf_ptr++];
2539 if (c2 != '\r' && c2 != '\n')
2541 if (!cstate->csv_mode)
2543 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2544 errmsg("end-of-copy marker corrupt")));
2546 NO_END_OF_COPY_GOTO;
2549 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
2550 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
2551 (cstate->eol_type == EOL_CR && c2 != '\r'))
2554 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2555 errmsg("end-of-copy marker does not match previous newline style")));
2559 * Transfer only the data before the \. into line_buf, then
2560 * discard the data and the \. sequence.
2562 if (prev_raw_ptr > cstate->raw_buf_index)
2563 appendBinaryStringInfo(&cstate->line_buf,
2564 cstate->raw_buf + cstate->raw_buf_index,
2565 prev_raw_ptr - cstate->raw_buf_index);
2566 cstate->raw_buf_index = raw_buf_ptr;
2567 result = true; /* report EOF */
2570 else if (!cstate->csv_mode)
2573 * If we are here, it means we found a backslash followed by
2574 * something other than a period. In non-CSV mode, anything
2575 * after a backslash is special, so we skip over that second
2576 * character too. If we didn't do that \\. would be
2577 * considered an eof-of copy, while in non-CVS mode it is a
2578 * literal backslash followed by a period. In CSV mode,
2579 * backslashes are not special, so we want to process the
2580 * character after the backslash just like a normal character,
2581 * so we don't increment in those cases.
2587 * This label is for CSV cases where \. appears at the start of a
2588 * line, but there is more text after it, meaning it was a data value.
2589 * We are more strict for \. in CSV mode because \. could be a data
2590 * value, while in non-CSV mode, \. cannot be a data value.
2595 * Process all bytes of a multi-byte character as a group.
2597 * We only support multi-byte sequences where the first byte has the
2598 * high-bit set, so as an optimization we can avoid this block
2599 * entirely if it is not set.
2601 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
2606 /* All our encodings only read the first byte to get the length */
2607 mblen = pg_encoding_mblen(cstate->client_encoding, mblen_str);
2608 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
2609 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
2610 raw_buf_ptr += mblen - 1;
2612 first_char_in_line = false;
2613 } /* end of outer loop */
2616 * Transfer any still-uncopied data to line_buf.
2624 * Return decimal value for a hexadecimal digit
2627 GetDecimalFromHex(char hex)
2629 if (isdigit((unsigned char) hex))
2632 return tolower((unsigned char) hex) - 'a' + 10;
2636 * Parse the current line into separate attributes (fields),
2637 * performing de-escaping as needed.
2639 * The input is in line_buf. We use attribute_buf to hold the result
2640 * strings. fieldvals[k] is set to point to the k'th attribute string,
2641 * or NULL when the input matches the null marker string. (Note that the
2642 * caller cannot check for nulls since the returned string would be the
2643 * post-de-escaping equivalent, which may look the same as some valid data
2646 * delim is the column delimiter string (must be just one byte for now).
2647 * null_print is the null marker string. Note that this is compared to
2648 * the pre-de-escaped input string.
2650 * The return value is the number of fields actually read. (We error out
2651 * if this would exceed maxfields, which is the length of fieldvals[].)
2654 CopyReadAttributesText(CopyState cstate, int maxfields, char **fieldvals)
2656 char delimc = cstate->delim[0];
2663 * We need a special case for zero-column tables: check that the input
2664 * line is empty, and return.
2668 if (cstate->line_buf.len != 0)
2670 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2671 errmsg("extra data after last expected column")));
2675 resetStringInfo(&cstate->attribute_buf);
2678 * The de-escaped attributes will certainly not be longer than the input
2679 * data line, so we can just force attribute_buf to be large enough and
2680 * then transfer data without any checks for enough space. We need to do
2681 * it this way because enlarging attribute_buf mid-stream would invalidate
2682 * pointers already stored into fieldvals[].
2684 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
2685 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
2686 output_ptr = cstate->attribute_buf.data;
2688 /* set pointer variables for loop */
2689 cur_ptr = cstate->line_buf.data;
2690 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
2692 /* Outer loop iterates over fields */
2696 bool found_delim = false;
2701 /* Make sure space remains in fieldvals[] */
2702 if (fieldno >= maxfields)
2704 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2705 errmsg("extra data after last expected column")));
2707 /* Remember start of field on both input and output sides */
2708 start_ptr = cur_ptr;
2709 fieldvals[fieldno] = output_ptr;
2711 /* Scan data for field */
2717 if (cur_ptr >= line_end_ptr)
2727 if (cur_ptr >= line_end_ptr)
2745 if (cur_ptr < line_end_ptr)
2751 val = (val << 3) + OCTVALUE(c);
2752 if (cur_ptr < line_end_ptr)
2758 val = (val << 3) + OCTVALUE(c);
2768 if (cur_ptr < line_end_ptr)
2770 char hexchar = *cur_ptr;
2772 if (isxdigit((unsigned char) hexchar))
2774 int val = GetDecimalFromHex(hexchar);
2777 if (cur_ptr < line_end_ptr)
2780 if (isxdigit((unsigned char) hexchar))
2783 val = (val << 4) + GetDecimalFromHex(hexchar);
2810 * in all other cases, take the char after '\'
2816 /* Add c to output string */
2820 /* Terminate attribute value in output area */
2821 *output_ptr++ = '\0';
2823 /* Check whether raw input matched null marker */
2824 input_len = end_ptr - start_ptr;
2825 if (input_len == cstate->null_print_len &&
2826 strncmp(start_ptr, cstate->null_print, input_len) == 0)
2827 fieldvals[fieldno] = NULL;
2830 /* Done if we hit EOL instead of a delim */
2835 /* Clean up state of attribute_buf */
2837 Assert(*output_ptr == '\0');
2838 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
2844 * Parse the current line into separate attributes (fields),
2845 * performing de-escaping as needed. This has exactly the same API as
2846 * CopyReadAttributesText, except we parse the fields according to
2847 * "standard" (i.e. common) CSV usage.
2850 CopyReadAttributesCSV(CopyState cstate, int maxfields, char **fieldvals)
2852 char delimc = cstate->delim[0];
2853 char quotec = cstate->quote[0];
2854 char escapec = cstate->escape[0];
2861 * We need a special case for zero-column tables: check that the input
2862 * line is empty, and return.
2866 if (cstate->line_buf.len != 0)
2868 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2869 errmsg("extra data after last expected column")));
2873 resetStringInfo(&cstate->attribute_buf);
2876 * The de-escaped attributes will certainly not be longer than the input
2877 * data line, so we can just force attribute_buf to be large enough and
2878 * then transfer data without any checks for enough space. We need to do
2879 * it this way because enlarging attribute_buf mid-stream would invalidate
2880 * pointers already stored into fieldvals[].
2882 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
2883 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
2884 output_ptr = cstate->attribute_buf.data;
2886 /* set pointer variables for loop */
2887 cur_ptr = cstate->line_buf.data;
2888 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
2890 /* Outer loop iterates over fields */
2894 bool found_delim = false;
2895 bool in_quote = false;
2896 bool saw_quote = false;
2901 /* Make sure space remains in fieldvals[] */
2902 if (fieldno >= maxfields)
2904 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2905 errmsg("extra data after last expected column")));
2907 /* Remember start of field on both input and output sides */
2908 start_ptr = cur_ptr;
2909 fieldvals[fieldno] = output_ptr;
2911 /* Scan data for field */
2917 if (cur_ptr >= line_end_ptr)
2920 /* unquoted field delimiter */
2921 if (c == delimc && !in_quote)
2926 /* start of quoted field (or part of field) */
2927 if (c == quotec && !in_quote)
2933 /* escape within a quoted field */
2934 if (c == escapec && in_quote)
2937 * peek at the next char if available, and escape it if it is
2938 * an escape char or a quote char
2940 if (cur_ptr < line_end_ptr)
2942 char nextc = *cur_ptr;
2944 if (nextc == escapec || nextc == quotec)
2946 *output_ptr++ = nextc;
2954 * end of quoted field. Must do this test after testing for escape
2955 * in case quote char and escape char are the same (which is the
2958 if (c == quotec && in_quote)
2964 /* Add c to output string */
2968 /* Terminate attribute value in output area */
2969 *output_ptr++ = '\0';
2971 /* Shouldn't still be in quote mode */
2974 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2975 errmsg("unterminated CSV quoted field")));
2977 /* Check whether raw input matched null marker */
2978 input_len = end_ptr - start_ptr;
2979 if (!saw_quote && input_len == cstate->null_print_len &&
2980 strncmp(start_ptr, cstate->null_print, input_len) == 0)
2981 fieldvals[fieldno] = NULL;
2984 /* Done if we hit EOL instead of a delim */
2989 /* Clean up state of attribute_buf */
2991 Assert(*output_ptr == '\0');
2992 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
2999 * Read a binary attribute
3002 CopyReadBinaryAttribute(CopyState cstate,
3003 int column_no, FmgrInfo *flinfo,
3004 Oid typioparam, int32 typmod,
3010 if (!CopyGetInt32(cstate, &fld_size))
3012 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3013 errmsg("unexpected EOF in COPY data")));
3017 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
3021 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3022 errmsg("invalid field size")));
3024 /* reset attribute_buf to empty, and load raw data in it */
3025 resetStringInfo(&cstate->attribute_buf);
3027 enlargeStringInfo(&cstate->attribute_buf, fld_size);
3028 if (CopyGetData(cstate, cstate->attribute_buf.data,
3029 fld_size, fld_size) != fld_size)
3031 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3032 errmsg("unexpected EOF in COPY data")));
3034 cstate->attribute_buf.len = fld_size;
3035 cstate->attribute_buf.data[fld_size] = '\0';
3037 /* Call the column type's binary input converter */
3038 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
3039 typioparam, typmod);
3041 /* Trouble if it didn't eat the whole buffer */
3042 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
3044 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
3045 errmsg("incorrect binary data format")));
3052 * Send text representation of one attribute, with conversion and escaping
3054 #define DUMPSOFAR() \
3057 CopySendData(cstate, start, ptr - start); \
3061 CopyAttributeOutText(CopyState cstate, char *string)
3066 char delimc = cstate->delim[0];
3068 if (cstate->need_transcoding)
3069 ptr = pg_server_to_client(string, strlen(string));
3074 * We have to grovel through the string searching for control characters
3075 * and instances of the delimiter character. In most cases, though, these
3076 * are infrequent. To avoid overhead from calling CopySendData once per
3077 * character, we dump out all characters between replaceable characters in
3078 * a single call. The loop invariant is that the data from "start" to
3079 * "ptr" can be sent literally, but hasn't yet been.
3082 while ((c = *ptr) != '\0')
3088 CopySendString(cstate, "\\b");
3093 CopySendString(cstate, "\\f");
3098 CopySendString(cstate, "\\n");
3103 CopySendString(cstate, "\\r");
3108 CopySendString(cstate, "\\t");
3113 CopySendString(cstate, "\\v");
3118 CopySendString(cstate, "\\\\");
3125 CopySendChar(cstate, '\\');
3126 start = ptr; /* we include char in next run */
3130 * We can skip pg_encoding_mblen() overhead when encoding is
3131 * safe, because in valid backend encodings, extra bytes of a
3132 * multibyte character never look like ASCII.
3134 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3135 ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
3146 * Send text representation of one attribute, with conversion and
3147 * CSV-style escaping
3150 CopyAttributeOutCSV(CopyState cstate, char *string,
3151 bool use_quote, bool single_attr)
3156 char delimc = cstate->delim[0];
3157 char quotec = cstate->quote[0];
3158 char escapec = cstate->escape[0];
3160 /* force quoting if it matches null_print (before conversion!) */
3161 if (!use_quote && strcmp(string, cstate->null_print) == 0)
3164 if (cstate->need_transcoding)
3165 ptr = pg_server_to_client(string, strlen(string));
3170 * Make a preliminary pass to discover if it needs quoting
3175 * Because '\.' can be a data value, quote it if it appears alone on a
3176 * line so it is not interpreted as the end-of-data marker.
3178 if (single_attr && strcmp(ptr, "\\.") == 0)
3184 while ((c = *tptr) != '\0')
3186 if (c == delimc || c == quotec || c == '\n' || c == '\r')
3191 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3192 tptr += pg_encoding_mblen(cstate->client_encoding, tptr);
3201 CopySendChar(cstate, quotec);
3204 * We adopt the same optimization strategy as in CopyAttributeOutText
3207 while ((c = *ptr) != '\0')
3209 if (c == quotec || c == escapec)
3212 CopySendChar(cstate, escapec);
3213 start = ptr; /* we include char in next run */
3215 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3216 ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
3222 CopySendChar(cstate, quotec);
3226 /* If it doesn't need quoting, we can just dump it as-is */
3227 CopySendString(cstate, ptr);
3232 * CopyGetAttnums - build an integer list of attnums to be copied
3234 * The input attnamelist is either the user-specified column list,
3235 * or NIL if there was none (in which case we want all the non-dropped
3238 * rel can be NULL ... it's only used for error reports.
3241 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
3243 List *attnums = NIL;
3245 if (attnamelist == NIL)
3247 /* Generate default column list */
3248 Form_pg_attribute *attr = tupDesc->attrs;
3249 int attr_count = tupDesc->natts;
3252 for (i = 0; i < attr_count; i++)
3254 if (attr[i]->attisdropped)
3256 attnums = lappend_int(attnums, i + 1);
3261 /* Validate the user-supplied list and extract attnums */
3264 foreach(l, attnamelist)
3266 char *name = strVal(lfirst(l));
3270 /* Lookup column name */
3271 attnum = InvalidAttrNumber;
3272 for (i = 0; i < tupDesc->natts; i++)
3274 if (tupDesc->attrs[i]->attisdropped)
3276 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
3278 attnum = tupDesc->attrs[i]->attnum;
3282 if (attnum == InvalidAttrNumber)
3286 (errcode(ERRCODE_UNDEFINED_COLUMN),
3287 errmsg("column \"%s\" of relation \"%s\" does not exist",
3288 name, RelationGetRelationName(rel))));
3291 (errcode(ERRCODE_UNDEFINED_COLUMN),
3292 errmsg("column \"%s\" does not exist",
3295 /* Check for duplicates */
3296 if (list_member_int(attnums, attnum))
3298 (errcode(ERRCODE_DUPLICATE_COLUMN),
3299 errmsg("column \"%s\" specified more than once",
3301 attnums = lappend_int(attnums, attnum);
3310 * copy_dest_startup --- executor startup
3313 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
3319 * copy_dest_receive --- receive one tuple
3322 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
3324 DR_copy *myState = (DR_copy *) self;
3325 CopyState cstate = myState->cstate;
3327 /* Make sure the tuple is fully deconstructed */
3328 slot_getallattrs(slot);
3330 /* And send the data */
3331 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
3335 * copy_dest_shutdown --- executor end
3338 copy_dest_shutdown(DestReceiver *self)
3344 * copy_dest_destroy --- release DestReceiver object
3347 copy_dest_destroy(DestReceiver *self)
3353 * CreateCopyDestReceiver -- create a suitable DestReceiver object
3356 CreateCopyDestReceiver(void)
3358 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
3360 self->pub.receiveSlot = copy_dest_receive;
3361 self->pub.rStartup = copy_dest_startup;
3362 self->pub.rShutdown = copy_dest_shutdown;
3363 self->pub.rDestroy = copy_dest_destroy;
3364 self->pub.mydest = DestCopyOut;
3366 self->cstate = NULL; /* will be set later */
3368 return (DestReceiver *) self;