1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.311 2009/06/03 15:06:48 tgl Exp $
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/xact.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_type.h"
27 #include "commands/copy.h"
28 #include "commands/trigger.h"
29 #include "executor/executor.h"
30 #include "libpq/libpq.h"
31 #include "libpq/pqformat.h"
32 #include "mb/pg_wchar.h"
33 #include "miscadmin.h"
34 #include "optimizer/planner.h"
35 #include "parser/parse_relation.h"
36 #include "rewrite/rewriteHandler.h"
37 #include "storage/fd.h"
38 #include "tcop/tcopprot.h"
39 #include "utils/acl.h"
40 #include "utils/builtins.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/snapmgr.h"
46 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
47 #define OCTVALUE(c) ((c) - '0')
50 * Represents the different source/dest cases we need to worry about at
55 COPY_FILE, /* to/from file */
56 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
57 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
61 * Represents the end-of-line terminator type of the input
72 * This struct contains all the state variables used throughout a COPY
73 * operation. For simplicity, we use the same struct for all variants of COPY,
74 * even though some fields are used in only some cases.
76 * Multi-byte encodings: all supported client-side encodings encode multi-byte
77 * characters by having the first byte's high bit set. Subsequent bytes of the
78 * character can have the high bit not set. When scanning data in such an
79 * encoding to look for a match to a single-byte (ie ASCII) character, we must
80 * use the full pg_encoding_mblen() machinery to skip over multibyte
81 * characters, else we might find a false match to a trailing byte. In
82 * supported server encodings, there is no possibility of a false match, and
83 * it's faster to make useless comparisons to trailing bytes than it is to
84 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
85 * when we have to do it the hard way.
87 typedef struct CopyStateData
89 /* low-level state data */
90 CopyDest copy_dest; /* type of copy source/destination */
91 FILE *copy_file; /* used if copy_dest == COPY_FILE */
92 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
93 * dest == COPY_NEW_FE in COPY FROM */
94 bool fe_copy; /* true for all FE copy dests */
95 bool fe_eof; /* true if detected end of copy data */
96 EolType eol_type; /* EOL type of input */
97 int client_encoding; /* remote side's character encoding */
98 bool need_transcoding; /* client encoding diff from server? */
99 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
100 uint64 processed; /* # of tuples processed */
102 /* parameters from the COPY command */
103 Relation rel; /* relation to copy to or from */
104 QueryDesc *queryDesc; /* executable query to copy from */
105 List *attnumlist; /* integer list of attnums to copy */
106 char *filename; /* filename, or NULL for STDIN/STDOUT */
107 bool binary; /* binary format? */
108 bool oids; /* include OIDs? */
109 bool csv_mode; /* Comma Separated Value format? */
110 bool header_line; /* CSV header line? */
111 char *null_print; /* NULL marker string (server encoding!) */
112 int null_print_len; /* length of same */
113 char *null_print_client; /* same converted to client encoding */
114 char *delim; /* column delimiter (must be 1 byte) */
115 char *quote; /* CSV quote char (must be 1 byte) */
116 char *escape; /* CSV escape char (must be 1 byte) */
117 bool *force_quote_flags; /* per-column CSV FQ flags */
118 bool *force_notnull_flags; /* per-column CSV FNN flags */
120 /* these are just for error messages, see copy_in_error_callback */
121 const char *cur_relname; /* table name for error messages */
122 int cur_lineno; /* line number for error messages */
123 const char *cur_attname; /* current att for error messages */
124 const char *cur_attval; /* current att value for error messages */
127 * Working state for COPY TO
129 FmgrInfo *out_functions; /* lookup info for output functions */
130 MemoryContext rowcontext; /* per-row evaluation context */
133 * These variables are used to reduce overhead in textual COPY FROM.
135 * attribute_buf holds the separated, de-escaped text for each field of
136 * the current line. The CopyReadAttributes functions return arrays of
137 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
138 * the buffer on each cycle.
140 StringInfoData attribute_buf;
143 * Similarly, line_buf holds the whole input line being processed. The
144 * input cycle is first to read the whole line into line_buf, convert it
145 * to server encoding there, and then extract the individual attribute
146 * fields into attribute_buf. line_buf is preserved unmodified so that we
147 * can display it in error messages if appropriate.
149 StringInfoData line_buf;
150 bool line_buf_converted; /* converted to server encoding? */
153 * Finally, raw_buf holds raw data read from the data source (file or
154 * client connection). CopyReadLine parses this data sufficiently to
155 * locate line boundaries, then transfers the data to line_buf and
156 * converts it. Note: we guarantee that there is a \0 at
157 * raw_buf[raw_buf_len].
159 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
161 int raw_buf_index; /* next byte to process */
162 int raw_buf_len; /* total # of bytes stored */
165 typedef CopyStateData *CopyState;
167 /* DestReceiver for COPY (SELECT) TO */
170 DestReceiver pub; /* publicly-known function pointers */
171 CopyState cstate; /* CopyStateData for the command */
176 * These macros centralize code used to process line_buf and raw_buf buffers.
177 * They are macros because they often do continue/break control and to avoid
178 * function call overhead in tight COPY loops.
180 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
181 * prevent the continue/break processing from working. We end the "if (1)"
182 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
183 * any "else" in the calling code, and to avoid any compiler warnings about
184 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
188 * This keeps the character read at the top of the loop in the buffer
189 * even if there is more than one read-ahead.
191 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
194 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
196 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
202 /* This consumes the remainder of the buffer and breaks */
203 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
206 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
209 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
210 /* backslash just before EOF, treat as data char */ \
217 * Transfer any approved data to line_buf; must do this to be sure
218 * there is some room in raw_buf.
220 #define REFILL_LINEBUF \
223 if (raw_buf_ptr > cstate->raw_buf_index) \
225 appendBinaryStringInfo(&cstate->line_buf, \
226 cstate->raw_buf + cstate->raw_buf_index, \
227 raw_buf_ptr - cstate->raw_buf_index); \
228 cstate->raw_buf_index = raw_buf_ptr; \
232 /* Undo any read-ahead and jump out of the block. */
233 #define NO_END_OF_COPY_GOTO \
236 raw_buf_ptr = prev_raw_ptr + 1; \
237 goto not_end_of_copy; \
240 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
243 /* non-export function prototypes */
244 static void DoCopyTo(CopyState cstate);
245 static void CopyTo(CopyState cstate);
246 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
247 Datum *values, bool *nulls);
248 static void CopyFrom(CopyState cstate);
249 static bool CopyReadLine(CopyState cstate);
250 static bool CopyReadLineText(CopyState cstate);
251 static int CopyReadAttributesText(CopyState cstate, int maxfields,
253 static int CopyReadAttributesCSV(CopyState cstate, int maxfields,
255 static Datum CopyReadBinaryAttribute(CopyState cstate,
256 int column_no, FmgrInfo *flinfo,
257 Oid typioparam, int32 typmod,
259 static void CopyAttributeOutText(CopyState cstate, char *string);
260 static void CopyAttributeOutCSV(CopyState cstate, char *string,
261 bool use_quote, bool single_attr);
262 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
264 static char *limit_printout_length(const char *str);
266 /* Low-level communications functions */
267 static void SendCopyBegin(CopyState cstate);
268 static void ReceiveCopyBegin(CopyState cstate);
269 static void SendCopyEnd(CopyState cstate);
270 static void CopySendData(CopyState cstate, void *databuf, int datasize);
271 static void CopySendString(CopyState cstate, const char *str);
272 static void CopySendChar(CopyState cstate, char c);
273 static void CopySendEndOfRow(CopyState cstate);
274 static int CopyGetData(CopyState cstate, void *databuf,
275 int minread, int maxread);
276 static void CopySendInt32(CopyState cstate, int32 val);
277 static bool CopyGetInt32(CopyState cstate, int32 *val);
278 static void CopySendInt16(CopyState cstate, int16 val);
279 static bool CopyGetInt16(CopyState cstate, int16 *val);
283 * Send copy start/stop messages for frontend copies. These have changed
284 * in past protocol redesigns.
287 SendCopyBegin(CopyState cstate)
289 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
293 int natts = list_length(cstate->attnumlist);
294 int16 format = (cstate->binary ? 1 : 0);
297 pq_beginmessage(&buf, 'H');
298 pq_sendbyte(&buf, format); /* overall format */
299 pq_sendint(&buf, natts, 2);
300 for (i = 0; i < natts; i++)
301 pq_sendint(&buf, format, 2); /* per-column formats */
303 cstate->copy_dest = COPY_NEW_FE;
305 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
310 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
311 errmsg("COPY BINARY is not supported to stdout or from stdin")));
312 pq_putemptymessage('H');
313 /* grottiness needed for old COPY OUT protocol */
315 cstate->copy_dest = COPY_OLD_FE;
322 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
323 errmsg("COPY BINARY is not supported to stdout or from stdin")));
324 pq_putemptymessage('B');
325 /* grottiness needed for old COPY OUT protocol */
327 cstate->copy_dest = COPY_OLD_FE;
332 ReceiveCopyBegin(CopyState cstate)
334 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
338 int natts = list_length(cstate->attnumlist);
339 int16 format = (cstate->binary ? 1 : 0);
342 pq_beginmessage(&buf, 'G');
343 pq_sendbyte(&buf, format); /* overall format */
344 pq_sendint(&buf, natts, 2);
345 for (i = 0; i < natts; i++)
346 pq_sendint(&buf, format, 2); /* per-column formats */
348 cstate->copy_dest = COPY_NEW_FE;
349 cstate->fe_msgbuf = makeStringInfo();
351 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
356 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
357 errmsg("COPY BINARY is not supported to stdout or from stdin")));
358 pq_putemptymessage('G');
359 cstate->copy_dest = COPY_OLD_FE;
366 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
367 errmsg("COPY BINARY is not supported to stdout or from stdin")));
368 pq_putemptymessage('D');
369 cstate->copy_dest = COPY_OLD_FE;
371 /* We *must* flush here to ensure FE knows it can send. */
376 SendCopyEnd(CopyState cstate)
378 if (cstate->copy_dest == COPY_NEW_FE)
380 /* Shouldn't have any unsent data */
381 Assert(cstate->fe_msgbuf->len == 0);
382 /* Send Copy Done message */
383 pq_putemptymessage('c');
387 CopySendData(cstate, "\\.", 2);
388 /* Need to flush out the trailer (this also appends a newline) */
389 CopySendEndOfRow(cstate);
390 pq_endcopyout(false);
395 * CopySendData sends output data to the destination (file or frontend)
396 * CopySendString does the same for null-terminated strings
397 * CopySendChar does the same for single characters
398 * CopySendEndOfRow does the appropriate thing at end of each data row
399 * (data is not actually flushed except by CopySendEndOfRow)
401 * NB: no data conversion is applied by these functions
405 CopySendData(CopyState cstate, void *databuf, int datasize)
407 appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
411 CopySendString(CopyState cstate, const char *str)
413 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
417 CopySendChar(CopyState cstate, char c)
419 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
423 CopySendEndOfRow(CopyState cstate)
425 StringInfo fe_msgbuf = cstate->fe_msgbuf;
427 switch (cstate->copy_dest)
432 /* Default line termination depends on platform */
434 CopySendChar(cstate, '\n');
436 CopySendString(cstate, "\r\n");
440 (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
441 1, cstate->copy_file);
442 if (ferror(cstate->copy_file))
444 (errcode_for_file_access(),
445 errmsg("could not write to COPY file: %m")));
448 /* The FE/BE protocol uses \n as newline for all platforms */
450 CopySendChar(cstate, '\n');
452 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
454 /* no hope of recovering connection sync, so FATAL */
456 (errcode(ERRCODE_CONNECTION_FAILURE),
457 errmsg("connection lost during COPY to stdout")));
461 /* The FE/BE protocol uses \n as newline for all platforms */
463 CopySendChar(cstate, '\n');
465 /* Dump the accumulated row as one CopyData message */
466 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
470 resetStringInfo(fe_msgbuf);
474 * CopyGetData reads data from the source (file or frontend)
476 * We attempt to read at least minread, and at most maxread, bytes from
477 * the source. The actual number of bytes read is returned; if this is
478 * less than minread, EOF was detected.
480 * Note: when copying from the frontend, we expect a proper EOF mark per
481 * protocol; if the frontend simply drops the connection, we raise error.
482 * It seems unwise to allow the COPY IN to complete normally in that case.
484 * NB: no data conversion is applied here.
487 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
491 switch (cstate->copy_dest)
494 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
495 if (ferror(cstate->copy_file))
497 (errcode_for_file_access(),
498 errmsg("could not read from COPY file: %m")));
503 * We cannot read more than minread bytes (which in practice is 1)
504 * because old protocol doesn't have any clear way of separating
505 * the COPY stream from following data. This is slow, but not any
506 * slower than the code path was originally, and we don't care
507 * much anymore about the performance of old protocol.
509 if (pq_getbytes((char *) databuf, minread))
511 /* Only a \. terminator is legal EOF in old protocol */
513 (errcode(ERRCODE_CONNECTION_FAILURE),
514 errmsg("unexpected EOF on client connection")));
519 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
523 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
525 /* Try to receive another message */
529 mtype = pq_getbyte();
532 (errcode(ERRCODE_CONNECTION_FAILURE),
533 errmsg("unexpected EOF on client connection")));
534 if (pq_getmessage(cstate->fe_msgbuf, 0))
536 (errcode(ERRCODE_CONNECTION_FAILURE),
537 errmsg("unexpected EOF on client connection")));
540 case 'd': /* CopyData */
542 case 'c': /* CopyDone */
543 /* COPY IN correctly terminated by frontend */
544 cstate->fe_eof = true;
546 case 'f': /* CopyFail */
548 (errcode(ERRCODE_QUERY_CANCELED),
549 errmsg("COPY from stdin failed: %s",
550 pq_getmsgstring(cstate->fe_msgbuf))));
552 case 'H': /* Flush */
556 * Ignore Flush/Sync for the convenience of client
557 * libraries (such as libpq) that may send those
558 * without noticing that the command they just
564 (errcode(ERRCODE_PROTOCOL_VIOLATION),
565 errmsg("unexpected message type 0x%02X during COPY from stdin",
570 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
573 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
574 databuf = (void *) ((char *) databuf + avail);
586 * These functions do apply some data conversion
590 * CopySendInt32 sends an int32 in network byte order
593 CopySendInt32(CopyState cstate, int32 val)
597 buf = htonl((uint32) val);
598 CopySendData(cstate, &buf, sizeof(buf));
602 * CopyGetInt32 reads an int32 that appears in network byte order
604 * Returns true if OK, false if EOF
607 CopyGetInt32(CopyState cstate, int32 *val)
611 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
613 *val = 0; /* suppress compiler warning */
616 *val = (int32) ntohl(buf);
621 * CopySendInt16 sends an int16 in network byte order
624 CopySendInt16(CopyState cstate, int16 val)
628 buf = htons((uint16) val);
629 CopySendData(cstate, &buf, sizeof(buf));
633 * CopyGetInt16 reads an int16 that appears in network byte order
636 CopyGetInt16(CopyState cstate, int16 *val)
640 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
642 *val = 0; /* suppress compiler warning */
645 *val = (int16) ntohs(buf);
651 * CopyLoadRawBuf loads some more data into raw_buf
653 * Returns TRUE if able to obtain at least one more byte, else FALSE.
655 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
656 * down to the start of the buffer and then we load more data after that.
657 * This case is used only when a frontend multibyte character crosses a
658 * bufferload boundary.
661 CopyLoadRawBuf(CopyState cstate)
666 if (cstate->raw_buf_index < cstate->raw_buf_len)
668 /* Copy down the unprocessed data */
669 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
670 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
674 nbytes = 0; /* no data need be saved */
676 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
677 1, RAW_BUF_SIZE - nbytes);
679 cstate->raw_buf[nbytes] = '\0';
680 cstate->raw_buf_index = 0;
681 cstate->raw_buf_len = nbytes;
682 return (inbytes > 0);
687 * DoCopy executes the SQL COPY statement
689 * Either unload or reload contents of table <relation>, depending on <from>.
690 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
691 * we also support copying the output of an arbitrary SELECT query.
693 * If <pipe> is false, transfer is between the table and the file named
694 * <filename>. Otherwise, transfer is between the table and our regular
695 * input/output stream. The latter could be either stdin/stdout or a
696 * socket, depending on whether we're running under Postmaster control.
698 * Iff <binary>, unload or reload in the binary format, as opposed to the
699 * more wasteful but more robust and portable text format.
701 * Iff <oids>, unload or reload the format that includes OID information.
702 * On input, we accept OIDs whether or not the table has an OID column,
703 * but silently drop them if it does not. On output, we report an error
704 * if the user asks for OIDs in a table that has none (not providing an
705 * OID column might seem friendlier, but could seriously confuse programs).
707 * If in the text format, delimit columns with delimiter <delim> and print
708 * NULL values as <null_print>.
710 * Do not allow a Postgres user without superuser privilege to read from
711 * or write to a file.
713 * Do not allow the copy if user doesn't have proper permission to access
714 * the table or the specifically requested columns.
717 DoCopy(const CopyStmt *stmt, const char *queryString)
720 bool is_from = stmt->is_from;
721 bool pipe = (stmt->filename == NULL);
722 List *attnamelist = stmt->attlist;
723 List *force_quote = NIL;
724 List *force_notnull = NIL;
725 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
727 AclMode remainingPerms;
733 /* Allocate workspace and zero all fields */
734 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
736 /* Extract options from the statement node tree */
737 foreach(option, stmt->options)
739 DefElem *defel = (DefElem *) lfirst(option);
741 if (strcmp(defel->defname, "binary") == 0)
745 (errcode(ERRCODE_SYNTAX_ERROR),
746 errmsg("conflicting or redundant options")));
747 cstate->binary = intVal(defel->arg);
749 else if (strcmp(defel->defname, "oids") == 0)
753 (errcode(ERRCODE_SYNTAX_ERROR),
754 errmsg("conflicting or redundant options")));
755 cstate->oids = intVal(defel->arg);
757 else if (strcmp(defel->defname, "delimiter") == 0)
761 (errcode(ERRCODE_SYNTAX_ERROR),
762 errmsg("conflicting or redundant options")));
763 cstate->delim = strVal(defel->arg);
765 else if (strcmp(defel->defname, "null") == 0)
767 if (cstate->null_print)
769 (errcode(ERRCODE_SYNTAX_ERROR),
770 errmsg("conflicting or redundant options")));
771 cstate->null_print = strVal(defel->arg);
773 else if (strcmp(defel->defname, "csv") == 0)
775 if (cstate->csv_mode)
777 (errcode(ERRCODE_SYNTAX_ERROR),
778 errmsg("conflicting or redundant options")));
779 cstate->csv_mode = intVal(defel->arg);
781 else if (strcmp(defel->defname, "header") == 0)
783 if (cstate->header_line)
785 (errcode(ERRCODE_SYNTAX_ERROR),
786 errmsg("conflicting or redundant options")));
787 cstate->header_line = intVal(defel->arg);
789 else if (strcmp(defel->defname, "quote") == 0)
793 (errcode(ERRCODE_SYNTAX_ERROR),
794 errmsg("conflicting or redundant options")));
795 cstate->quote = strVal(defel->arg);
797 else if (strcmp(defel->defname, "escape") == 0)
801 (errcode(ERRCODE_SYNTAX_ERROR),
802 errmsg("conflicting or redundant options")));
803 cstate->escape = strVal(defel->arg);
805 else if (strcmp(defel->defname, "force_quote") == 0)
809 (errcode(ERRCODE_SYNTAX_ERROR),
810 errmsg("conflicting or redundant options")));
811 force_quote = (List *) defel->arg;
813 else if (strcmp(defel->defname, "force_notnull") == 0)
817 (errcode(ERRCODE_SYNTAX_ERROR),
818 errmsg("conflicting or redundant options")));
819 force_notnull = (List *) defel->arg;
822 elog(ERROR, "option \"%s\" not recognized",
826 /* Check for incompatible options */
827 if (cstate->binary && cstate->delim)
829 (errcode(ERRCODE_SYNTAX_ERROR),
830 errmsg("cannot specify DELIMITER in BINARY mode")));
832 if (cstate->binary && cstate->csv_mode)
834 (errcode(ERRCODE_SYNTAX_ERROR),
835 errmsg("cannot specify CSV in BINARY mode")));
837 if (cstate->binary && cstate->null_print)
839 (errcode(ERRCODE_SYNTAX_ERROR),
840 errmsg("cannot specify NULL in BINARY mode")));
842 /* Set defaults for omitted options */
844 cstate->delim = cstate->csv_mode ? "," : "\t";
846 if (!cstate->null_print)
847 cstate->null_print = cstate->csv_mode ? "" : "\\N";
848 cstate->null_print_len = strlen(cstate->null_print);
850 if (cstate->csv_mode)
853 cstate->quote = "\"";
855 cstate->escape = cstate->quote;
858 /* Only single-byte delimiter strings are supported. */
859 if (strlen(cstate->delim) != 1)
861 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
862 errmsg("COPY delimiter must be a single one-byte character")));
864 /* Disallow end-of-line characters */
865 if (strchr(cstate->delim, '\r') != NULL ||
866 strchr(cstate->delim, '\n') != NULL)
868 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
869 errmsg("COPY delimiter cannot be newline or carriage return")));
871 if (strchr(cstate->null_print, '\r') != NULL ||
872 strchr(cstate->null_print, '\n') != NULL)
874 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
875 errmsg("COPY null representation cannot use newline or carriage return")));
878 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
879 * backslash because it would be ambiguous. We can't allow the other
880 * cases because data characters matching the delimiter must be
881 * backslashed, and certain backslash combinations are interpreted
882 * non-literally by COPY IN. Disallowing all lower case ASCII letters
883 * is more than strictly necessary, but seems best for consistency and
884 * future-proofing. Likewise we disallow all digits though only octal
885 * digits are actually dangerous.
887 if (!cstate->csv_mode &&
888 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
889 cstate->delim[0]) != NULL)
891 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
892 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
895 if (!cstate->csv_mode && cstate->header_line)
897 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
898 errmsg("COPY HEADER available only in CSV mode")));
901 if (!cstate->csv_mode && cstate->quote != NULL)
903 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
904 errmsg("COPY quote available only in CSV mode")));
906 if (cstate->csv_mode && strlen(cstate->quote) != 1)
908 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
909 errmsg("COPY quote must be a single one-byte character")));
911 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
913 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
914 errmsg("COPY delimiter and quote must be different")));
917 if (!cstate->csv_mode && cstate->escape != NULL)
919 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
920 errmsg("COPY escape available only in CSV mode")));
922 if (cstate->csv_mode && strlen(cstate->escape) != 1)
924 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
925 errmsg("COPY escape must be a single one-byte character")));
927 /* Check force_quote */
928 if (!cstate->csv_mode && force_quote != NIL)
930 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
931 errmsg("COPY force quote available only in CSV mode")));
932 if (force_quote != NIL && is_from)
934 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
935 errmsg("COPY force quote only available using COPY TO")));
937 /* Check force_notnull */
938 if (!cstate->csv_mode && force_notnull != NIL)
940 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
941 errmsg("COPY force not null available only in CSV mode")));
942 if (force_notnull != NIL && !is_from)
944 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
945 errmsg("COPY force not null only available using COPY FROM")));
947 /* Don't allow the delimiter to appear in the null string. */
948 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
950 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
951 errmsg("COPY delimiter must not appear in the NULL specification")));
953 /* Don't allow the CSV quote char to appear in the null string. */
954 if (cstate->csv_mode &&
955 strchr(cstate->null_print, cstate->quote[0]) != NULL)
957 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
958 errmsg("CSV quote character must not appear in the NULL specification")));
960 /* Disallow file COPY except to superusers. */
961 if (!pipe && !superuser())
963 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
964 errmsg("must be superuser to COPY to or from a file"),
965 errhint("Anyone can COPY to stdout or from stdin. "
966 "psql's \\copy command also works for anyone.")));
970 Assert(!stmt->query);
971 cstate->queryDesc = NULL;
973 /* Open and lock the relation, using the appropriate lock type. */
974 cstate->rel = heap_openrv(stmt->relation,
975 (is_from ? RowExclusiveLock : AccessShareLock));
977 tupDesc = RelationGetDescr(cstate->rel);
979 /* Check relation permissions. */
980 relPerms = pg_class_aclmask(RelationGetRelid(cstate->rel), GetUserId(),
981 required_access, ACLMASK_ALL);
982 remainingPerms = required_access & ~relPerms;
983 if (remainingPerms != 0)
985 /* We don't have table permissions, check per-column permissions */
989 attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
990 foreach(cur, attnums)
992 int attnum = lfirst_int(cur);
994 if (pg_attribute_aclcheck(RelationGetRelid(cstate->rel),
997 remainingPerms) != ACLCHECK_OK)
998 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
999 RelationGetRelationName(cstate->rel));
1003 /* check read-only transaction */
1004 if (XactReadOnly && is_from && !cstate->rel->rd_islocaltemp)
1006 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
1007 errmsg("transaction is read-only")));
1009 /* Don't allow COPY w/ OIDs to or from a table without them */
1010 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1012 (errcode(ERRCODE_UNDEFINED_COLUMN),
1013 errmsg("table \"%s\" does not have OIDs",
1014 RelationGetRelationName(cstate->rel))));
1026 /* Don't allow COPY w/ OIDs from a select */
1029 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1030 errmsg("COPY (SELECT) WITH OIDS is not supported")));
1033 * Run parse analysis and rewrite. Note this also acquires sufficient
1034 * locks on the source table(s).
1036 * Because the parser and planner tend to scribble on their input, we
1037 * make a preliminary copy of the source querytree. This prevents
1038 * problems in the case that the COPY is in a portal or plpgsql
1039 * function and is executed repeatedly. (See also the same hack in
1040 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1042 rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
1043 queryString, NULL, 0);
1045 /* We don't expect more or less than one result query */
1046 if (list_length(rewritten) != 1)
1047 elog(ERROR, "unexpected rewrite result");
1049 query = (Query *) linitial(rewritten);
1050 Assert(query->commandType == CMD_SELECT);
1051 Assert(query->utilityStmt == NULL);
1053 /* Query mustn't use INTO, either */
1054 if (query->intoClause)
1056 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1057 errmsg("COPY (SELECT INTO) is not supported")));
1059 /* plan the query */
1060 plan = planner(query, 0, NULL);
1063 * Use a snapshot with an updated command ID to ensure this query sees
1064 * results of any previously executed queries.
1066 PushUpdatedSnapshot(GetActiveSnapshot());
1068 /* Create dest receiver for COPY OUT */
1069 dest = CreateDestReceiver(DestCopyOut);
1070 ((DR_copy *) dest)->cstate = cstate;
1072 /* Create a QueryDesc requesting no output */
1073 cstate->queryDesc = CreateQueryDesc(plan, queryString,
1074 GetActiveSnapshot(),
1079 * Call ExecutorStart to prepare the plan for execution.
1081 * ExecutorStart computes a result tupdesc for us
1083 ExecutorStart(cstate->queryDesc, 0);
1085 tupDesc = cstate->queryDesc->tupDesc;
1088 /* Generate or convert list of attributes to process */
1089 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1091 num_phys_attrs = tupDesc->natts;
1093 /* Convert FORCE QUOTE name list to per-column flags, check validity */
1094 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1100 attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
1102 foreach(cur, attnums)
1104 int attnum = lfirst_int(cur);
1106 if (!list_member_int(cstate->attnumlist, attnum))
1108 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1109 errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
1110 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1111 cstate->force_quote_flags[attnum - 1] = true;
1115 /* Convert FORCE NOT NULL name list to per-column flags, check validity */
1116 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1122 attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
1124 foreach(cur, attnums)
1126 int attnum = lfirst_int(cur);
1128 if (!list_member_int(cstate->attnumlist, attnum))
1130 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1131 errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
1132 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1133 cstate->force_notnull_flags[attnum - 1] = true;
1137 /* Set up variables to avoid per-attribute overhead. */
1138 initStringInfo(&cstate->attribute_buf);
1139 initStringInfo(&cstate->line_buf);
1140 cstate->line_buf_converted = false;
1141 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
1142 cstate->raw_buf_index = cstate->raw_buf_len = 0;
1143 cstate->processed = 0;
1146 * Set up encoding conversion info. Even if the client and server
1147 * encodings are the same, we must apply pg_client_to_server() to validate
1148 * data in multibyte encodings.
1150 cstate->client_encoding = pg_get_client_encoding();
1151 cstate->need_transcoding =
1152 (cstate->client_encoding != GetDatabaseEncoding() ||
1153 pg_database_encoding_max_length() > 1);
1154 /* See Multibyte encoding comment above */
1155 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
1157 cstate->copy_dest = COPY_FILE; /* default */
1158 cstate->filename = stmt->filename;
1161 CopyFrom(cstate); /* copy from file to database */
1163 DoCopyTo(cstate); /* copy from database to file */
1166 * Close the relation or query. If reading, we can release the
1167 * AccessShareLock we got; if writing, we should hold the lock until end
1168 * of transaction to ensure that updates will be committed before lock is
1172 heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
1175 /* Close down the query and free resources. */
1176 ExecutorEnd(cstate->queryDesc);
1177 FreeQueryDesc(cstate->queryDesc);
1178 PopActiveSnapshot();
1181 /* Clean up storage (probably not really necessary) */
1182 processed = cstate->processed;
1184 pfree(cstate->attribute_buf.data);
1185 pfree(cstate->line_buf.data);
1186 pfree(cstate->raw_buf);
1194 * This intermediate routine exists mainly to localize the effects of setjmp
1195 * so we don't need to plaster a lot of variables with "volatile".
1198 DoCopyTo(CopyState cstate)
1200 bool pipe = (cstate->filename == NULL);
1204 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1206 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1208 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1209 errmsg("cannot copy from view \"%s\"",
1210 RelationGetRelationName(cstate->rel)),
1211 errhint("Try the COPY (SELECT ...) TO variant.")));
1212 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1214 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1215 errmsg("cannot copy from sequence \"%s\"",
1216 RelationGetRelationName(cstate->rel))));
1219 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1220 errmsg("cannot copy from non-table relation \"%s\"",
1221 RelationGetRelationName(cstate->rel))));
1227 if (whereToSendOutput == DestRemote)
1228 cstate->fe_copy = true;
1230 cstate->copy_file = stdout;
1234 mode_t oumask; /* Pre-existing umask value */
1238 * Prevent write to relative path ... too easy to shoot oneself in the
1239 * foot by overwriting a database file ...
1241 if (!is_absolute_path(cstate->filename))
1243 (errcode(ERRCODE_INVALID_NAME),
1244 errmsg("relative path not allowed for COPY to file")));
1246 oumask = umask((mode_t) 022);
1247 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1250 if (cstate->copy_file == NULL)
1252 (errcode_for_file_access(),
1253 errmsg("could not open file \"%s\" for writing: %m",
1254 cstate->filename)));
1256 fstat(fileno(cstate->copy_file), &st);
1257 if (S_ISDIR(st.st_mode))
1259 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1260 errmsg("\"%s\" is a directory", cstate->filename)));
1265 if (cstate->fe_copy)
1266 SendCopyBegin(cstate);
1270 if (cstate->fe_copy)
1271 SendCopyEnd(cstate);
1276 * Make sure we turn off old-style COPY OUT mode upon error. It is
1277 * okay to do this in all cases, since it does nothing if the mode is
1280 pq_endcopyout(true);
1287 if (FreeFile(cstate->copy_file))
1289 (errcode_for_file_access(),
1290 errmsg("could not write to file \"%s\": %m",
1291 cstate->filename)));
1296 * Copy from relation or query TO file.
1299 CopyTo(CopyState cstate)
1303 Form_pg_attribute *attr;
1307 tupDesc = RelationGetDescr(cstate->rel);
1309 tupDesc = cstate->queryDesc->tupDesc;
1310 attr = tupDesc->attrs;
1311 num_phys_attrs = tupDesc->natts;
1312 cstate->null_print_client = cstate->null_print; /* default */
1314 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1315 cstate->fe_msgbuf = makeStringInfo();
1317 /* Get info about the columns we need to process. */
1318 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1319 foreach(cur, cstate->attnumlist)
1321 int attnum = lfirst_int(cur);
1326 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1330 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1333 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1337 * Create a temporary memory context that we can reset once per row to
1338 * recover palloc'd memory. This avoids any problems with leaks inside
1339 * datatype output routines, and should be faster than retail pfree's
1340 * anyway. (We don't need a whole econtext as CopyFrom does.)
1342 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1344 ALLOCSET_DEFAULT_MINSIZE,
1345 ALLOCSET_DEFAULT_INITSIZE,
1346 ALLOCSET_DEFAULT_MAXSIZE);
1350 /* Generate header for a binary copy */
1354 CopySendData(cstate, (char *) BinarySignature, 11);
1359 CopySendInt32(cstate, tmp);
1360 /* No header extension */
1362 CopySendInt32(cstate, tmp);
1367 * For non-binary copy, we need to convert null_print to client
1368 * encoding, because it will be sent directly with CopySendString.
1370 if (cstate->need_transcoding)
1371 cstate->null_print_client = pg_server_to_client(cstate->null_print,
1372 cstate->null_print_len);
1374 /* if a header has been requested send the line */
1375 if (cstate->header_line)
1377 bool hdr_delim = false;
1379 foreach(cur, cstate->attnumlist)
1381 int attnum = lfirst_int(cur);
1385 CopySendChar(cstate, cstate->delim[0]);
1388 colname = NameStr(attr[attnum - 1]->attname);
1390 CopyAttributeOutCSV(cstate, colname, false,
1391 list_length(cstate->attnumlist) == 1);
1394 CopySendEndOfRow(cstate);
1402 HeapScanDesc scandesc;
1405 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1406 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1408 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
1410 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
1412 CHECK_FOR_INTERRUPTS();
1414 /* Deconstruct the tuple ... faster than repeated heap_getattr */
1415 heap_deform_tuple(tuple, tupDesc, values, nulls);
1417 /* Format and send the data */
1418 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
1421 heap_endscan(scandesc);
1425 /* run the plan --- the dest receiver will send tuples */
1426 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
1431 /* Generate trailer for a binary copy */
1432 CopySendInt16(cstate, -1);
1433 /* Need to flush out the trailer */
1434 CopySendEndOfRow(cstate);
1437 MemoryContextDelete(cstate->rowcontext);
1441 * Emit one row during CopyTo().
1444 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
1446 bool need_delim = false;
1447 FmgrInfo *out_functions = cstate->out_functions;
1448 MemoryContext oldcontext;
1452 MemoryContextReset(cstate->rowcontext);
1453 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1457 /* Binary per-tuple header */
1458 CopySendInt16(cstate, list_length(cstate->attnumlist));
1459 /* Send OID if wanted --- note attnumlist doesn't include it */
1462 /* Hack --- assume Oid is same size as int32 */
1463 CopySendInt32(cstate, sizeof(int32));
1464 CopySendInt32(cstate, tupleOid);
1469 /* Text format has no per-tuple header, but send OID if wanted */
1470 /* Assume digits don't need any quoting or encoding conversion */
1473 string = DatumGetCString(DirectFunctionCall1(oidout,
1474 ObjectIdGetDatum(tupleOid)));
1475 CopySendString(cstate, string);
1480 foreach(cur, cstate->attnumlist)
1482 int attnum = lfirst_int(cur);
1483 Datum value = values[attnum - 1];
1484 bool isnull = nulls[attnum - 1];
1486 if (!cstate->binary)
1489 CopySendChar(cstate, cstate->delim[0]);
1495 if (!cstate->binary)
1496 CopySendString(cstate, cstate->null_print_client);
1498 CopySendInt32(cstate, -1);
1502 if (!cstate->binary)
1504 string = OutputFunctionCall(&out_functions[attnum - 1],
1506 if (cstate->csv_mode)
1507 CopyAttributeOutCSV(cstate, string,
1508 cstate->force_quote_flags[attnum - 1],
1509 list_length(cstate->attnumlist) == 1);
1511 CopyAttributeOutText(cstate, string);
1517 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
1519 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
1520 CopySendData(cstate, VARDATA(outputbytes),
1521 VARSIZE(outputbytes) - VARHDRSZ);
1526 CopySendEndOfRow(cstate);
1528 MemoryContextSwitchTo(oldcontext);
1530 cstate->processed++;
1535 * error context callback for COPY FROM
1538 copy_in_error_callback(void *arg)
1540 CopyState cstate = (CopyState) arg;
1544 /* can't usefully display the data */
1545 if (cstate->cur_attname)
1546 errcontext("COPY %s, line %d, column %s",
1547 cstate->cur_relname, cstate->cur_lineno,
1548 cstate->cur_attname);
1550 errcontext("COPY %s, line %d",
1551 cstate->cur_relname, cstate->cur_lineno);
1555 if (cstate->cur_attname && cstate->cur_attval)
1557 /* error is relevant to a particular column */
1560 attval = limit_printout_length(cstate->cur_attval);
1561 errcontext("COPY %s, line %d, column %s: \"%s\"",
1562 cstate->cur_relname, cstate->cur_lineno,
1563 cstate->cur_attname, attval);
1566 else if (cstate->cur_attname)
1568 /* error is relevant to a particular column, value is NULL */
1569 errcontext("COPY %s, line %d, column %s: null input",
1570 cstate->cur_relname, cstate->cur_lineno,
1571 cstate->cur_attname);
1575 /* error is relevant to a particular line */
1576 if (cstate->line_buf_converted || !cstate->need_transcoding)
1580 lineval = limit_printout_length(cstate->line_buf.data);
1581 errcontext("COPY %s, line %d: \"%s\"",
1582 cstate->cur_relname, cstate->cur_lineno, lineval);
1588 * Here, the line buffer is still in a foreign encoding, and
1589 * indeed it's quite likely that the error is precisely a
1590 * failure to do encoding conversion (ie, bad data). We dare
1591 * not try to convert it, and at present there's no way to
1592 * regurgitate it without conversion. So we have to punt and
1593 * just report the line number.
1595 errcontext("COPY %s, line %d",
1596 cstate->cur_relname, cstate->cur_lineno);
1603 * Make sure we don't print an unreasonable amount of COPY data in a message.
1605 * It would seem a lot easier to just use the sprintf "precision" limit to
1606 * truncate the string. However, some versions of glibc have a bug/misfeature
1607 * that vsnprintf will always fail (return -1) if it is asked to truncate
1608 * a string that contains invalid byte sequences for the current encoding.
1609 * So, do our own truncation. We return a pstrdup'd copy of the input.
1612 limit_printout_length(const char *str)
1614 #define MAX_COPY_DATA_DISPLAY 100
1616 int slen = strlen(str);
1620 /* Fast path if definitely okay */
1621 if (slen <= MAX_COPY_DATA_DISPLAY)
1622 return pstrdup(str);
1624 /* Apply encoding-dependent truncation */
1625 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
1628 * Truncate, and add "..." to show we truncated the input.
1630 res = (char *) palloc(len + 4);
1631 memcpy(res, str, len);
1632 strcpy(res + len, "...");
1638 * Copy FROM file to relation.
1641 CopyFrom(CopyState cstate)
1643 bool pipe = (cstate->filename == NULL);
1646 Form_pg_attribute *attr;
1647 AttrNumber num_phys_attrs,
1650 FmgrInfo *in_functions;
1651 FmgrInfo oid_in_function;
1660 char **field_strings;
1663 ResultRelInfo *resultRelInfo;
1664 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
1665 TupleTableSlot *slot;
1668 ExprState **defexprs; /* array of default att expressions */
1669 ExprContext *econtext; /* used for ExecEvalExpr for default atts */
1670 MemoryContext oldcontext = CurrentMemoryContext;
1671 ErrorContextCallback errcontext;
1672 CommandId mycid = GetCurrentCommandId(true);
1673 int hi_options = 0; /* start with default heap_insert options */
1674 BulkInsertState bistate;
1676 Assert(cstate->rel);
1678 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1680 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1682 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1683 errmsg("cannot copy to view \"%s\"",
1684 RelationGetRelationName(cstate->rel))));
1685 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1687 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1688 errmsg("cannot copy to sequence \"%s\"",
1689 RelationGetRelationName(cstate->rel))));
1692 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1693 errmsg("cannot copy to non-table relation \"%s\"",
1694 RelationGetRelationName(cstate->rel))));
1698 * Check to see if we can avoid writing WAL
1700 * If archive logging is not enabled *and* either
1701 * - table was created in same transaction as this COPY
1702 * - data is being written to relfilenode created in this transaction
1703 * then we can skip writing WAL. It's safe because if the transaction
1704 * doesn't commit, we'll discard the table (or the new relfilenode file).
1705 * If it does commit, we'll have done the heap_sync at the bottom of this
1708 * As mentioned in comments in utils/rel.h, the in-same-transaction test
1709 * is not completely reliable, since in rare cases rd_createSubid or
1710 * rd_newRelfilenodeSubid can be cleared before the end of the transaction.
1711 * However this is OK since at worst we will fail to make the optimization.
1713 * Also, if the target file is new-in-transaction, we assume that checking
1714 * FSM for free space is a waste of time, even if we must use WAL because
1715 * of archiving. This could possibly be wrong, but it's unlikely.
1717 * The comments for heap_insert and RelationGetBufferForTuple specify that
1718 * skipping WAL logging is only safe if we ensure that our tuples do not
1719 * go into pages containing tuples from any other transactions --- but this
1720 * must be the case if we have a new table or new relfilenode, so we need
1721 * no additional work to enforce that.
1724 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
1725 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1727 hi_options |= HEAP_INSERT_SKIP_FSM;
1728 if (!XLogArchivingActive())
1729 hi_options |= HEAP_INSERT_SKIP_WAL;
1734 if (whereToSendOutput == DestRemote)
1735 ReceiveCopyBegin(cstate);
1737 cstate->copy_file = stdin;
1743 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1745 if (cstate->copy_file == NULL)
1747 (errcode_for_file_access(),
1748 errmsg("could not open file \"%s\" for reading: %m",
1749 cstate->filename)));
1751 fstat(fileno(cstate->copy_file), &st);
1752 if (S_ISDIR(st.st_mode))
1754 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1755 errmsg("\"%s\" is a directory", cstate->filename)));
1758 tupDesc = RelationGetDescr(cstate->rel);
1759 attr = tupDesc->attrs;
1760 num_phys_attrs = tupDesc->natts;
1761 attr_count = list_length(cstate->attnumlist);
1765 * We need a ResultRelInfo so we can use the regular executor's
1766 * index-entry-making machinery. (There used to be a huge amount of code
1767 * here that basically duplicated execUtils.c ...)
1769 resultRelInfo = makeNode(ResultRelInfo);
1770 resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
1771 resultRelInfo->ri_RelationDesc = cstate->rel;
1772 resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
1773 if (resultRelInfo->ri_TrigDesc)
1774 resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
1775 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
1776 resultRelInfo->ri_TrigInstrument = NULL;
1778 ExecOpenIndices(resultRelInfo);
1780 estate->es_result_relations = resultRelInfo;
1781 estate->es_num_result_relations = 1;
1782 estate->es_result_relation_info = resultRelInfo;
1784 /* Set up a tuple slot too */
1785 slot = MakeSingleTupleTableSlot(tupDesc);
1787 econtext = GetPerTupleExprContext(estate);
1790 * Pick up the required catalog information for each attribute in the
1791 * relation, including the input function, the element type (to pass to
1792 * the input function), and info about defaults and constraints. (Which
1793 * input function we use depends on text/binary format choice.)
1795 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1796 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1797 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1798 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1800 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1802 /* We don't need info for dropped attributes */
1803 if (attr[attnum - 1]->attisdropped)
1806 /* Fetch the input function and typioparam info */
1808 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
1809 &in_func_oid, &typioparams[attnum - 1]);
1811 getTypeInputInfo(attr[attnum - 1]->atttypid,
1812 &in_func_oid, &typioparams[attnum - 1]);
1813 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1815 /* Get default info if needed */
1816 if (!list_member_int(cstate->attnumlist, attnum))
1818 /* attribute is NOT to be copied from input */
1819 /* use default value if one exists */
1820 Node *defexpr = build_column_default(cstate->rel, attnum);
1822 if (defexpr != NULL)
1824 defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
1826 defmap[num_defaults] = attnum - 1;
1832 /* Prepare to catch AFTER triggers. */
1833 AfterTriggerBeginQuery();
1836 * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
1837 * should do this for COPY, since it's not really an "INSERT" statement as
1838 * such. However, executing these triggers maintains consistency with the
1839 * EACH ROW triggers that we already fire on COPY.
1841 ExecBSInsertTriggers(estate, resultRelInfo);
1843 if (!cstate->binary)
1844 file_has_oids = cstate->oids; /* must rely on user to tell us... */
1847 /* Read and verify binary header */
1852 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
1853 memcmp(readSig, BinarySignature, 11) != 0)
1855 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1856 errmsg("COPY file signature not recognized")));
1858 if (!CopyGetInt32(cstate, &tmp))
1860 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1861 errmsg("invalid COPY file header (missing flags)")));
1862 file_has_oids = (tmp & (1 << 16)) != 0;
1864 if ((tmp >> 16) != 0)
1866 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1867 errmsg("unrecognized critical flags in COPY file header")));
1868 /* Header extension length */
1869 if (!CopyGetInt32(cstate, &tmp) ||
1872 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1873 errmsg("invalid COPY file header (missing length)")));
1874 /* Skip extension header, if present */
1877 if (CopyGetData(cstate, readSig, 1, 1) != 1)
1879 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1880 errmsg("invalid COPY file header (wrong length)")));
1884 if (file_has_oids && cstate->binary)
1886 getTypeBinaryInputInfo(OIDOID,
1887 &in_func_oid, &oid_typioparam);
1888 fmgr_info(in_func_oid, &oid_in_function);
1891 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1892 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1894 /* create workspace for CopyReadAttributes results */
1895 nfields = file_has_oids ? (attr_count + 1) : attr_count;
1896 field_strings = (char **) palloc(nfields * sizeof(char *));
1898 /* Initialize state variables */
1899 cstate->fe_eof = false;
1900 cstate->eol_type = EOL_UNKNOWN;
1901 cstate->cur_relname = RelationGetRelationName(cstate->rel);
1902 cstate->cur_lineno = 0;
1903 cstate->cur_attname = NULL;
1904 cstate->cur_attval = NULL;
1906 bistate = GetBulkInsertState();
1908 /* Set up callback to identify error line number */
1909 errcontext.callback = copy_in_error_callback;
1910 errcontext.arg = (void *) cstate;
1911 errcontext.previous = error_context_stack;
1912 error_context_stack = &errcontext;
1914 /* on input just throw the header line away */
1915 if (cstate->header_line)
1917 cstate->cur_lineno++;
1918 done = CopyReadLine(cstate);
1924 Oid loaded_oid = InvalidOid;
1926 CHECK_FOR_INTERRUPTS();
1928 cstate->cur_lineno++;
1930 /* Reset the per-tuple exprcontext */
1931 ResetPerTupleExprContext(estate);
1933 /* Switch into its memory context */
1934 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1936 /* Initialize all values for row to NULL */
1937 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
1938 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
1940 if (!cstate->binary)
1947 /* Actually read the line into memory here */
1948 done = CopyReadLine(cstate);
1951 * EOF at start of line means we're done. If we see EOF after
1952 * some characters, we act as though it was newline followed by
1953 * EOF, ie, process the line and then exit loop on next iteration.
1955 if (done && cstate->line_buf.len == 0)
1958 /* Parse the line into de-escaped field values */
1959 if (cstate->csv_mode)
1960 fldct = CopyReadAttributesCSV(cstate, nfields, field_strings);
1962 fldct = CopyReadAttributesText(cstate, nfields, field_strings);
1965 /* Read the OID field if present */
1968 if (fieldno >= fldct)
1970 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1971 errmsg("missing data for OID column")));
1972 string = field_strings[fieldno++];
1976 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1977 errmsg("null OID in COPY data")));
1980 cstate->cur_attname = "oid";
1981 cstate->cur_attval = string;
1982 loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
1983 CStringGetDatum(string)));
1984 if (loaded_oid == InvalidOid)
1986 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1987 errmsg("invalid OID in COPY data")));
1988 cstate->cur_attname = NULL;
1989 cstate->cur_attval = NULL;
1993 /* Loop to read the user attributes on the line. */
1994 foreach(cur, cstate->attnumlist)
1996 int attnum = lfirst_int(cur);
1999 if (fieldno >= fldct)
2001 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2002 errmsg("missing data for column \"%s\"",
2003 NameStr(attr[m]->attname))));
2004 string = field_strings[fieldno++];
2006 if (cstate->csv_mode && string == NULL &&
2007 cstate->force_notnull_flags[m])
2009 /* Go ahead and read the NULL string */
2010 string = cstate->null_print;
2013 cstate->cur_attname = NameStr(attr[m]->attname);
2014 cstate->cur_attval = string;
2015 values[m] = InputFunctionCall(&in_functions[m],
2018 attr[m]->atttypmod);
2021 cstate->cur_attname = NULL;
2022 cstate->cur_attval = NULL;
2025 Assert(fieldno == nfields);
2033 if (!CopyGetInt16(cstate, &fld_count) ||
2040 if (fld_count != attr_count)
2042 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2043 errmsg("row field count is %d, expected %d",
2044 (int) fld_count, attr_count)));
2048 cstate->cur_attname = "oid";
2050 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
2056 if (isnull || loaded_oid == InvalidOid)
2058 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2059 errmsg("invalid OID in COPY data")));
2060 cstate->cur_attname = NULL;
2064 foreach(cur, cstate->attnumlist)
2066 int attnum = lfirst_int(cur);
2069 cstate->cur_attname = NameStr(attr[m]->attname);
2071 values[m] = CopyReadBinaryAttribute(cstate,
2077 cstate->cur_attname = NULL;
2082 * Now compute and insert any defaults available for the columns not
2083 * provided by the input data. Anything not processed here or above
2086 for (i = 0; i < num_defaults; i++)
2088 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
2089 &nulls[defmap[i]], NULL);
2092 /* And now we can form the input tuple. */
2093 tuple = heap_form_tuple(tupDesc, values, nulls);
2095 if (cstate->oids && file_has_oids)
2096 HeapTupleSetOid(tuple, loaded_oid);
2098 /* Triggers and stuff need to be invoked in query context. */
2099 MemoryContextSwitchTo(oldcontext);
2103 /* BEFORE ROW INSERT Triggers */
2104 if (resultRelInfo->ri_TrigDesc &&
2105 resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
2109 newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
2111 if (newtuple == NULL) /* "do nothing" */
2113 else if (newtuple != tuple) /* modified by Trigger(s) */
2115 heap_freetuple(tuple);
2122 /* Place tuple in tuple slot */
2123 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2125 /* Check the constraints of the tuple */
2126 if (cstate->rel->rd_att->constr)
2127 ExecConstraints(resultRelInfo, slot, estate);
2129 /* OK, store the tuple and create index entries for it */
2130 heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
2132 if (resultRelInfo->ri_NumIndices > 0)
2133 ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
2135 /* AFTER ROW INSERT Triggers */
2136 ExecARInsertTriggers(estate, resultRelInfo, tuple);
2139 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2140 * this is the same definition used by execMain.c for counting
2141 * tuples inserted by an INSERT command.
2143 cstate->processed++;
2147 /* Done, clean up */
2148 error_context_stack = errcontext.previous;
2150 FreeBulkInsertState(bistate);
2152 MemoryContextSwitchTo(oldcontext);
2154 /* Execute AFTER STATEMENT insertion triggers */
2155 ExecASInsertTriggers(estate, resultRelInfo);
2157 /* Handle queued AFTER triggers */
2158 AfterTriggerEndQuery(estate);
2162 pfree(field_strings);
2164 pfree(in_functions);
2169 ExecDropSingleTupleTableSlot(slot);
2171 ExecCloseIndices(resultRelInfo);
2173 FreeExecutorState(estate);
2177 if (FreeFile(cstate->copy_file))
2179 (errcode_for_file_access(),
2180 errmsg("could not read from file \"%s\": %m",
2181 cstate->filename)));
2185 * If we skipped writing WAL, then we need to sync the heap (but not
2186 * indexes since those use WAL anyway)
2188 if (hi_options & HEAP_INSERT_SKIP_WAL)
2189 heap_sync(cstate->rel);
2194 * Read the next input line and stash it in line_buf, with conversion to
2197 * Result is true if read was terminated by EOF, false if terminated
2198 * by newline. The terminating newline or EOF marker is not included
2199 * in the final value of line_buf.
2202 CopyReadLine(CopyState cstate)
2206 resetStringInfo(&cstate->line_buf);
2208 /* Mark that encoding conversion hasn't occurred yet */
2209 cstate->line_buf_converted = false;
2211 /* Parse data and transfer into line_buf */
2212 result = CopyReadLineText(cstate);
2217 * Reached EOF. In protocol version 3, we should ignore anything
2218 * after \. up to the protocol end of copy data. (XXX maybe better
2219 * not to treat \. as special?)
2221 if (cstate->copy_dest == COPY_NEW_FE)
2225 cstate->raw_buf_index = cstate->raw_buf_len;
2226 } while (CopyLoadRawBuf(cstate));
2232 * If we didn't hit EOF, then we must have transferred the EOL marker
2233 * to line_buf along with the data. Get rid of it.
2235 switch (cstate->eol_type)
2238 Assert(cstate->line_buf.len >= 1);
2239 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2240 cstate->line_buf.len--;
2241 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2244 Assert(cstate->line_buf.len >= 1);
2245 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
2246 cstate->line_buf.len--;
2247 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2250 Assert(cstate->line_buf.len >= 2);
2251 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
2252 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2253 cstate->line_buf.len -= 2;
2254 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2257 /* shouldn't get here */
2263 /* Done reading the line. Convert it to server encoding. */
2264 if (cstate->need_transcoding)
2268 cvt = pg_client_to_server(cstate->line_buf.data,
2269 cstate->line_buf.len);
2270 if (cvt != cstate->line_buf.data)
2272 /* transfer converted data back to line_buf */
2273 resetStringInfo(&cstate->line_buf);
2274 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
2279 /* Now it's safe to use the buffer in error messages */
2280 cstate->line_buf_converted = true;
2286 * CopyReadLineText - inner loop of CopyReadLine for text mode
2289 CopyReadLineText(CopyState cstate)
2294 bool need_data = false;
2295 bool hit_eof = false;
2296 bool result = false;
2300 bool first_char_in_line = true;
2301 bool in_quote = false,
2302 last_was_esc = false;
2304 char escapec = '\0';
2306 if (cstate->csv_mode)
2308 quotec = cstate->quote[0];
2309 escapec = cstate->escape[0];
2310 /* ignore special escape processing if it's the same as quotec */
2311 if (quotec == escapec)
2315 mblen_str[1] = '\0';
2318 * The objective of this loop is to transfer the entire next input line
2319 * into line_buf. Hence, we only care for detecting newlines (\r and/or
2320 * \n) and the end-of-copy marker (\.).
2322 * In CSV mode, \r and \n inside a quoted field are just part of the data
2323 * value and are put in line_buf. We keep just enough state to know if we
2324 * are currently in a quoted field or not.
2326 * These four characters, and the CSV escape and quote characters, are
2327 * assumed the same in frontend and backend encodings.
2329 * For speed, we try to move data from raw_buf to line_buf in chunks
2330 * rather than one character at a time. raw_buf_ptr points to the next
2331 * character to examine; any characters from raw_buf_index to raw_buf_ptr
2332 * have been determined to be part of the line, but not yet transferred to
2335 * For a little extra speed within the loop, we copy raw_buf and
2336 * raw_buf_len into local variables.
2338 copy_raw_buf = cstate->raw_buf;
2339 raw_buf_ptr = cstate->raw_buf_index;
2340 copy_buf_len = cstate->raw_buf_len;
2348 * Load more data if needed. Ideally we would just force four bytes
2349 * of read-ahead and avoid the many calls to
2350 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
2351 * does not allow us to read too far ahead or we might read into the
2352 * next data, so we read-ahead only as far we know we can. One
2353 * optimization would be to read-ahead four byte here if
2354 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
2355 * considering the size of the buffer.
2357 if (raw_buf_ptr >= copy_buf_len || need_data)
2362 * Try to read some more data. This will certainly reset
2363 * raw_buf_index to zero, and raw_buf_ptr must go with it.
2365 if (!CopyLoadRawBuf(cstate))
2368 copy_buf_len = cstate->raw_buf_len;
2371 * If we are completely out of data, break out of the loop,
2374 if (copy_buf_len <= 0)
2382 /* OK to fetch a character */
2383 prev_raw_ptr = raw_buf_ptr;
2384 c = copy_raw_buf[raw_buf_ptr++];
2386 if (cstate->csv_mode)
2389 * If character is '\\' or '\r', we may need to look ahead below.
2390 * Force fetch of the next character if we don't already have it.
2391 * We need to do this before changing CSV state, in case one of
2392 * these characters is also the quote or escape character.
2394 * Note: old-protocol does not like forced prefetch, but it's OK
2395 * here since we cannot validly be at EOF.
2397 if (c == '\\' || c == '\r')
2399 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2403 * Dealing with quotes and escapes here is mildly tricky. If the
2404 * quote char is also the escape char, there's no problem - we
2405 * just use the char as a toggle. If they are different, we need
2406 * to ensure that we only take account of an escape inside a
2407 * quoted field and immediately preceding a quote char, and not
2408 * the second in a escape-escape sequence.
2410 if (in_quote && c == escapec)
2411 last_was_esc = !last_was_esc;
2412 if (c == quotec && !last_was_esc)
2413 in_quote = !in_quote;
2415 last_was_esc = false;
2418 * Updating the line count for embedded CR and/or LF chars is
2419 * necessarily a little fragile - this test is probably about the
2420 * best we can do. (XXX it's arguable whether we should do this
2421 * at all --- is cur_lineno a physical or logical count?)
2423 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
2424 cstate->cur_lineno++;
2428 if (c == '\r' && (!cstate->csv_mode || !in_quote))
2430 /* Check for \r\n on first line, _and_ handle \r\n. */
2431 if (cstate->eol_type == EOL_UNKNOWN ||
2432 cstate->eol_type == EOL_CRNL)
2435 * If need more data, go back to loop top to load it.
2437 * Note that if we are at EOF, c will wind up as '\0' because
2438 * of the guaranteed pad of raw_buf.
2440 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2443 c = copy_raw_buf[raw_buf_ptr];
2447 raw_buf_ptr++; /* eat newline */
2448 cstate->eol_type = EOL_CRNL; /* in case not set yet */
2452 /* found \r, but no \n */
2453 if (cstate->eol_type == EOL_CRNL)
2455 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2457 errmsg("literal carriage return found in data") :
2458 errmsg("unquoted carriage return found in data"),
2460 errhint("Use \"\\r\" to represent carriage return.") :
2461 errhint("Use quoted CSV field to represent carriage return.")));
2464 * if we got here, it is the first line and we didn't find
2465 * \n, so don't consume the peeked character
2467 cstate->eol_type = EOL_CR;
2470 else if (cstate->eol_type == EOL_NL)
2472 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2474 errmsg("literal carriage return found in data") :
2475 errmsg("unquoted carriage return found in data"),
2477 errhint("Use \"\\r\" to represent carriage return.") :
2478 errhint("Use quoted CSV field to represent carriage return.")));
2479 /* If reach here, we have found the line terminator */
2484 if (c == '\n' && (!cstate->csv_mode || !in_quote))
2486 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
2488 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2490 errmsg("literal newline found in data") :
2491 errmsg("unquoted newline found in data"),
2493 errhint("Use \"\\n\" to represent newline.") :
2494 errhint("Use quoted CSV field to represent newline.")));
2495 cstate->eol_type = EOL_NL; /* in case not set yet */
2496 /* If reach here, we have found the line terminator */
2501 * In CSV mode, we only recognize \. alone on a line. This is because
2502 * \. is a valid CSV data value.
2504 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
2508 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2509 IF_NEED_REFILL_AND_EOF_BREAK(0);
2512 * get next character
2513 * Note: we do not change c so if it isn't \., we can fall
2514 * through and continue processing for client encoding.
2517 c2 = copy_raw_buf[raw_buf_ptr];
2521 raw_buf_ptr++; /* consume the '.' */
2524 * Note: if we loop back for more data here, it does not
2525 * matter that the CSV state change checks are re-executed; we
2526 * will come back here with no important state changed.
2528 if (cstate->eol_type == EOL_CRNL)
2530 /* Get the next character */
2531 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2532 /* if hit_eof, c2 will become '\0' */
2533 c2 = copy_raw_buf[raw_buf_ptr++];
2537 if (!cstate->csv_mode)
2539 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2540 errmsg("end-of-copy marker does not match previous newline style")));
2542 NO_END_OF_COPY_GOTO;
2544 else if (c2 != '\r')
2546 if (!cstate->csv_mode)
2548 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2549 errmsg("end-of-copy marker corrupt")));
2551 NO_END_OF_COPY_GOTO;
2555 /* Get the next character */
2556 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2557 /* if hit_eof, c2 will become '\0' */
2558 c2 = copy_raw_buf[raw_buf_ptr++];
2560 if (c2 != '\r' && c2 != '\n')
2562 if (!cstate->csv_mode)
2564 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2565 errmsg("end-of-copy marker corrupt")));
2567 NO_END_OF_COPY_GOTO;
2570 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
2571 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
2572 (cstate->eol_type == EOL_CR && c2 != '\r'))
2575 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2576 errmsg("end-of-copy marker does not match previous newline style")));
2580 * Transfer only the data before the \. into line_buf, then
2581 * discard the data and the \. sequence.
2583 if (prev_raw_ptr > cstate->raw_buf_index)
2584 appendBinaryStringInfo(&cstate->line_buf,
2585 cstate->raw_buf + cstate->raw_buf_index,
2586 prev_raw_ptr - cstate->raw_buf_index);
2587 cstate->raw_buf_index = raw_buf_ptr;
2588 result = true; /* report EOF */
2591 else if (!cstate->csv_mode)
2594 * If we are here, it means we found a backslash followed by
2595 * something other than a period. In non-CSV mode, anything
2596 * after a backslash is special, so we skip over that second
2597 * character too. If we didn't do that \\. would be
2598 * considered an eof-of copy, while in non-CVS mode it is a
2599 * literal backslash followed by a period. In CSV mode,
2600 * backslashes are not special, so we want to process the
2601 * character after the backslash just like a normal character,
2602 * so we don't increment in those cases.
2608 * This label is for CSV cases where \. appears at the start of a
2609 * line, but there is more text after it, meaning it was a data value.
2610 * We are more strict for \. in CSV mode because \. could be a data
2611 * value, while in non-CSV mode, \. cannot be a data value.
2616 * Process all bytes of a multi-byte character as a group.
2618 * We only support multi-byte sequences where the first byte has the
2619 * high-bit set, so as an optimization we can avoid this block
2620 * entirely if it is not set.
2622 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
2627 /* All our encodings only read the first byte to get the length */
2628 mblen = pg_encoding_mblen(cstate->client_encoding, mblen_str);
2629 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
2630 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
2631 raw_buf_ptr += mblen - 1;
2633 first_char_in_line = false;
2634 } /* end of outer loop */
2637 * Transfer any still-uncopied data to line_buf.
2645 * Return decimal value for a hexadecimal digit
2648 GetDecimalFromHex(char hex)
2650 if (isdigit((unsigned char) hex))
2653 return tolower((unsigned char) hex) - 'a' + 10;
2657 * Parse the current line into separate attributes (fields),
2658 * performing de-escaping as needed.
2660 * The input is in line_buf. We use attribute_buf to hold the result
2661 * strings. fieldvals[k] is set to point to the k'th attribute string,
2662 * or NULL when the input matches the null marker string. (Note that the
2663 * caller cannot check for nulls since the returned string would be the
2664 * post-de-escaping equivalent, which may look the same as some valid data
2667 * delim is the column delimiter string (must be just one byte for now).
2668 * null_print is the null marker string. Note that this is compared to
2669 * the pre-de-escaped input string.
2671 * The return value is the number of fields actually read. (We error out
2672 * if this would exceed maxfields, which is the length of fieldvals[].)
2675 CopyReadAttributesText(CopyState cstate, int maxfields, char **fieldvals)
2677 char delimc = cstate->delim[0];
2684 * We need a special case for zero-column tables: check that the input
2685 * line is empty, and return.
2689 if (cstate->line_buf.len != 0)
2691 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2692 errmsg("extra data after last expected column")));
2696 resetStringInfo(&cstate->attribute_buf);
2699 * The de-escaped attributes will certainly not be longer than the input
2700 * data line, so we can just force attribute_buf to be large enough and
2701 * then transfer data without any checks for enough space. We need to do
2702 * it this way because enlarging attribute_buf mid-stream would invalidate
2703 * pointers already stored into fieldvals[].
2705 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
2706 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
2707 output_ptr = cstate->attribute_buf.data;
2709 /* set pointer variables for loop */
2710 cur_ptr = cstate->line_buf.data;
2711 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
2713 /* Outer loop iterates over fields */
2717 bool found_delim = false;
2721 bool saw_non_ascii = false;
2723 /* Make sure space remains in fieldvals[] */
2724 if (fieldno >= maxfields)
2726 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2727 errmsg("extra data after last expected column")));
2729 /* Remember start of field on both input and output sides */
2730 start_ptr = cur_ptr;
2731 fieldvals[fieldno] = output_ptr;
2733 /* Scan data for field */
2739 if (cur_ptr >= line_end_ptr)
2749 if (cur_ptr >= line_end_ptr)
2767 if (cur_ptr < line_end_ptr)
2773 val = (val << 3) + OCTVALUE(c);
2774 if (cur_ptr < line_end_ptr)
2780 val = (val << 3) + OCTVALUE(c);
2786 if (c == '\0' || IS_HIGHBIT_SET(c))
2787 saw_non_ascii = true;
2792 if (cur_ptr < line_end_ptr)
2794 char hexchar = *cur_ptr;
2796 if (isxdigit((unsigned char) hexchar))
2798 int val = GetDecimalFromHex(hexchar);
2801 if (cur_ptr < line_end_ptr)
2804 if (isxdigit((unsigned char) hexchar))
2807 val = (val << 4) + GetDecimalFromHex(hexchar);
2811 if (c == '\0' || IS_HIGHBIT_SET(c))
2812 saw_non_ascii = true;
2836 * in all other cases, take the char after '\'
2842 /* Add c to output string */
2846 /* Terminate attribute value in output area */
2847 *output_ptr++ = '\0';
2850 * If we de-escaped a non-7-bit-ASCII char, make sure we still
2851 * have valid data for the db encoding. Avoid calling strlen here for
2852 * the sake of efficiency.
2856 char *fld = fieldvals[fieldno];
2858 pg_verifymbstr(fld, output_ptr - (fld + 1), false);
2861 /* Check whether raw input matched null marker */
2862 input_len = end_ptr - start_ptr;
2863 if (input_len == cstate->null_print_len &&
2864 strncmp(start_ptr, cstate->null_print, input_len) == 0)
2865 fieldvals[fieldno] = NULL;
2868 /* Done if we hit EOL instead of a delim */
2873 /* Clean up state of attribute_buf */
2875 Assert(*output_ptr == '\0');
2876 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
2882 * Parse the current line into separate attributes (fields),
2883 * performing de-escaping as needed. This has exactly the same API as
2884 * CopyReadAttributesText, except we parse the fields according to
2885 * "standard" (i.e. common) CSV usage.
2888 CopyReadAttributesCSV(CopyState cstate, int maxfields, char **fieldvals)
2890 char delimc = cstate->delim[0];
2891 char quotec = cstate->quote[0];
2892 char escapec = cstate->escape[0];
2899 * We need a special case for zero-column tables: check that the input
2900 * line is empty, and return.
2904 if (cstate->line_buf.len != 0)
2906 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2907 errmsg("extra data after last expected column")));
2911 resetStringInfo(&cstate->attribute_buf);
2914 * The de-escaped attributes will certainly not be longer than the input
2915 * data line, so we can just force attribute_buf to be large enough and
2916 * then transfer data without any checks for enough space. We need to do
2917 * it this way because enlarging attribute_buf mid-stream would invalidate
2918 * pointers already stored into fieldvals[].
2920 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
2921 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
2922 output_ptr = cstate->attribute_buf.data;
2924 /* set pointer variables for loop */
2925 cur_ptr = cstate->line_buf.data;
2926 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
2928 /* Outer loop iterates over fields */
2932 bool found_delim = false;
2933 bool saw_quote = false;
2938 /* Make sure space remains in fieldvals[] */
2939 if (fieldno >= maxfields)
2941 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2942 errmsg("extra data after last expected column")));
2944 /* Remember start of field on both input and output sides */
2945 start_ptr = cur_ptr;
2946 fieldvals[fieldno] = output_ptr;
2948 /* Scan data for field,
2950 * The loop starts in "not quote" mode and then toggles between
2951 * that and "in quote" mode.
2952 * The loop exits normally if it is in "not quote" mode and a
2953 * delimiter or line end is seen.
2963 if (cur_ptr >= line_end_ptr)
2966 /* unquoted field delimiter */
2972 /* start of quoted field (or part of field) */
2978 /* Add c to output string */
2986 if (cur_ptr >= line_end_ptr)
2988 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2989 errmsg("unterminated CSV quoted field")));
2993 /* escape within a quoted field */
2997 * peek at the next char if available, and escape it if it is
2998 * an escape char or a quote char
3000 if (cur_ptr < line_end_ptr)
3002 char nextc = *cur_ptr;
3004 if (nextc == escapec || nextc == quotec)
3006 *output_ptr++ = nextc;
3013 * end of quoted field. Must do this test after testing for escape
3014 * in case quote char and escape char are the same (which is the
3020 /* Add c to output string */
3026 /* Terminate attribute value in output area */
3027 *output_ptr++ = '\0';
3029 /* Check whether raw input matched null marker */
3030 input_len = end_ptr - start_ptr;
3031 if (!saw_quote && input_len == cstate->null_print_len &&
3032 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3033 fieldvals[fieldno] = NULL;
3036 /* Done if we hit EOL instead of a delim */
3041 /* Clean up state of attribute_buf */
3043 Assert(*output_ptr == '\0');
3044 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3051 * Read a binary attribute
3054 CopyReadBinaryAttribute(CopyState cstate,
3055 int column_no, FmgrInfo *flinfo,
3056 Oid typioparam, int32 typmod,
3062 if (!CopyGetInt32(cstate, &fld_size))
3064 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3065 errmsg("unexpected EOF in COPY data")));
3069 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
3073 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3074 errmsg("invalid field size")));
3076 /* reset attribute_buf to empty, and load raw data in it */
3077 resetStringInfo(&cstate->attribute_buf);
3079 enlargeStringInfo(&cstate->attribute_buf, fld_size);
3080 if (CopyGetData(cstate, cstate->attribute_buf.data,
3081 fld_size, fld_size) != fld_size)
3083 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3084 errmsg("unexpected EOF in COPY data")));
3086 cstate->attribute_buf.len = fld_size;
3087 cstate->attribute_buf.data[fld_size] = '\0';
3089 /* Call the column type's binary input converter */
3090 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
3091 typioparam, typmod);
3093 /* Trouble if it didn't eat the whole buffer */
3094 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
3096 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
3097 errmsg("incorrect binary data format")));
3104 * Send text representation of one attribute, with conversion and escaping
3106 #define DUMPSOFAR() \
3109 CopySendData(cstate, start, ptr - start); \
3113 CopyAttributeOutText(CopyState cstate, char *string)
3118 char delimc = cstate->delim[0];
3120 if (cstate->need_transcoding)
3121 ptr = pg_server_to_client(string, strlen(string));
3126 * We have to grovel through the string searching for control characters
3127 * and instances of the delimiter character. In most cases, though, these
3128 * are infrequent. To avoid overhead from calling CopySendData once per
3129 * character, we dump out all characters between escaped characters in a
3130 * single call. The loop invariant is that the data from "start" to "ptr"
3131 * can be sent literally, but hasn't yet been.
3133 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
3134 * in valid backend encodings, extra bytes of a multibyte character never
3135 * look like ASCII. This loop is sufficiently performance-critical that
3136 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
3137 * of the normal safe-encoding path.
3139 if (cstate->encoding_embeds_ascii)
3142 while ((c = *ptr) != '\0')
3144 if ((unsigned char) c < (unsigned char) 0x20)
3147 * \r and \n must be escaped, the others are traditional.
3148 * We prefer to dump these using the C-like notation, rather
3149 * than a backslash and the literal character, because it
3150 * makes the dump file a bit more proof against Microsoftish
3174 /* If it's the delimiter, must backslash it */
3177 /* All ASCII control chars are length 1 */
3179 continue; /* fall to end of loop */
3181 /* if we get here, we need to convert the control char */
3183 CopySendChar(cstate, '\\');
3184 CopySendChar(cstate, c);
3185 start = ++ptr; /* do not include char in next run */
3187 else if (c == '\\' || c == delimc)
3190 CopySendChar(cstate, '\\');
3191 start = ptr++; /* we include char in next run */
3193 else if (IS_HIGHBIT_SET(c))
3194 ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
3202 while ((c = *ptr) != '\0')
3204 if ((unsigned char) c < (unsigned char) 0x20)
3207 * \r and \n must be escaped, the others are traditional.
3208 * We prefer to dump these using the C-like notation, rather
3209 * than a backslash and the literal character, because it
3210 * makes the dump file a bit more proof against Microsoftish
3234 /* If it's the delimiter, must backslash it */
3237 /* All ASCII control chars are length 1 */
3239 continue; /* fall to end of loop */
3241 /* if we get here, we need to convert the control char */
3243 CopySendChar(cstate, '\\');
3244 CopySendChar(cstate, c);
3245 start = ++ptr; /* do not include char in next run */
3247 else if (c == '\\' || c == delimc)
3250 CopySendChar(cstate, '\\');
3251 start = ptr++; /* we include char in next run */
3262 * Send text representation of one attribute, with conversion and
3263 * CSV-style escaping
3266 CopyAttributeOutCSV(CopyState cstate, char *string,
3267 bool use_quote, bool single_attr)
3272 char delimc = cstate->delim[0];
3273 char quotec = cstate->quote[0];
3274 char escapec = cstate->escape[0];
3276 /* force quoting if it matches null_print (before conversion!) */
3277 if (!use_quote && strcmp(string, cstate->null_print) == 0)
3280 if (cstate->need_transcoding)
3281 ptr = pg_server_to_client(string, strlen(string));
3286 * Make a preliminary pass to discover if it needs quoting
3291 * Because '\.' can be a data value, quote it if it appears alone on a
3292 * line so it is not interpreted as the end-of-data marker.
3294 if (single_attr && strcmp(ptr, "\\.") == 0)
3300 while ((c = *tptr) != '\0')
3302 if (c == delimc || c == quotec || c == '\n' || c == '\r')
3307 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3308 tptr += pg_encoding_mblen(cstate->client_encoding, tptr);
3317 CopySendChar(cstate, quotec);
3320 * We adopt the same optimization strategy as in CopyAttributeOutText
3323 while ((c = *ptr) != '\0')
3325 if (c == quotec || c == escapec)
3328 CopySendChar(cstate, escapec);
3329 start = ptr; /* we include char in next run */
3331 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3332 ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
3338 CopySendChar(cstate, quotec);
3342 /* If it doesn't need quoting, we can just dump it as-is */
3343 CopySendString(cstate, ptr);
3348 * CopyGetAttnums - build an integer list of attnums to be copied
3350 * The input attnamelist is either the user-specified column list,
3351 * or NIL if there was none (in which case we want all the non-dropped
3354 * rel can be NULL ... it's only used for error reports.
3357 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
3359 List *attnums = NIL;
3361 if (attnamelist == NIL)
3363 /* Generate default column list */
3364 Form_pg_attribute *attr = tupDesc->attrs;
3365 int attr_count = tupDesc->natts;
3368 for (i = 0; i < attr_count; i++)
3370 if (attr[i]->attisdropped)
3372 attnums = lappend_int(attnums, i + 1);
3377 /* Validate the user-supplied list and extract attnums */
3380 foreach(l, attnamelist)
3382 char *name = strVal(lfirst(l));
3386 /* Lookup column name */
3387 attnum = InvalidAttrNumber;
3388 for (i = 0; i < tupDesc->natts; i++)
3390 if (tupDesc->attrs[i]->attisdropped)
3392 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
3394 attnum = tupDesc->attrs[i]->attnum;
3398 if (attnum == InvalidAttrNumber)
3402 (errcode(ERRCODE_UNDEFINED_COLUMN),
3403 errmsg("column \"%s\" of relation \"%s\" does not exist",
3404 name, RelationGetRelationName(rel))));
3407 (errcode(ERRCODE_UNDEFINED_COLUMN),
3408 errmsg("column \"%s\" does not exist",
3411 /* Check for duplicates */
3412 if (list_member_int(attnums, attnum))
3414 (errcode(ERRCODE_DUPLICATE_COLUMN),
3415 errmsg("column \"%s\" specified more than once",
3417 attnums = lappend_int(attnums, attnum);
3426 * copy_dest_startup --- executor startup
3429 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
3435 * copy_dest_receive --- receive one tuple
3438 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
3440 DR_copy *myState = (DR_copy *) self;
3441 CopyState cstate = myState->cstate;
3443 /* Make sure the tuple is fully deconstructed */
3444 slot_getallattrs(slot);
3446 /* And send the data */
3447 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
3451 * copy_dest_shutdown --- executor end
3454 copy_dest_shutdown(DestReceiver *self)
3460 * copy_dest_destroy --- release DestReceiver object
3463 copy_dest_destroy(DestReceiver *self)
3469 * CreateCopyDestReceiver -- create a suitable DestReceiver object
3472 CreateCopyDestReceiver(void)
3474 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
3476 self->pub.receiveSlot = copy_dest_receive;
3477 self->pub.rStartup = copy_dest_startup;
3478 self->pub.rShutdown = copy_dest_shutdown;
3479 self->pub.rDestroy = copy_dest_destroy;
3480 self->pub.mydest = DestCopyOut;
3482 self->cstate = NULL; /* will be set later */
3484 return (DestReceiver *) self;