1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/commands/copy.c
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/sysattr.h"
25 #include "access/xact.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_type.h"
28 #include "commands/copy.h"
29 #include "commands/defrem.h"
30 #include "commands/trigger.h"
31 #include "executor/executor.h"
32 #include "libpq/libpq.h"
33 #include "libpq/pqformat.h"
34 #include "mb/pg_wchar.h"
35 #include "miscadmin.h"
36 #include "optimizer/planner.h"
37 #include "parser/parse_relation.h"
38 #include "rewrite/rewriteHandler.h"
39 #include "storage/fd.h"
40 #include "tcop/tcopprot.h"
41 #include "utils/acl.h"
42 #include "utils/builtins.h"
43 #include "utils/lsyscache.h"
44 #include "utils/memutils.h"
45 #include "utils/snapmgr.h"
48 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
49 #define OCTVALUE(c) ((c) - '0')
52 * Represents the different source/dest cases we need to worry about at
57 COPY_FILE, /* to/from file */
58 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
59 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
63 * Represents the end-of-line terminator type of the input
74 * This struct contains all the state variables used throughout a COPY
75 * operation. For simplicity, we use the same struct for all variants of COPY,
76 * even though some fields are used in only some cases.
78 * Multi-byte encodings: all supported client-side encodings encode multi-byte
79 * characters by having the first byte's high bit set. Subsequent bytes of the
80 * character can have the high bit not set. When scanning data in such an
81 * encoding to look for a match to a single-byte (ie ASCII) character, we must
82 * use the full pg_encoding_mblen() machinery to skip over multibyte
83 * characters, else we might find a false match to a trailing byte. In
84 * supported server encodings, there is no possibility of a false match, and
85 * it's faster to make useless comparisons to trailing bytes than it is to
86 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
87 * when we have to do it the hard way.
89 typedef struct CopyStateData
91 /* low-level state data */
92 CopyDest copy_dest; /* type of copy source/destination */
93 FILE *copy_file; /* used if copy_dest == COPY_FILE */
94 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
95 * dest == COPY_NEW_FE in COPY FROM */
96 bool fe_eof; /* true if detected end of copy data */
97 EolType eol_type; /* EOL type of input */
98 int file_encoding; /* file or remote side's character encoding */
99 bool need_transcoding; /* file encoding diff from server? */
100 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
102 /* parameters from the COPY command */
103 Relation rel; /* relation to copy to or from */
104 QueryDesc *queryDesc; /* executable query to copy from */
105 List *attnumlist; /* integer list of attnums to copy */
106 char *filename; /* filename, or NULL for STDIN/STDOUT */
107 bool binary; /* binary format? */
108 bool oids; /* include OIDs? */
109 bool csv_mode; /* Comma Separated Value format? */
110 bool header_line; /* CSV header line? */
111 char *null_print; /* NULL marker string (server encoding!) */
112 int null_print_len; /* length of same */
113 char *null_print_client; /* same converted to file encoding */
114 char *delim; /* column delimiter (must be 1 byte) */
115 char *quote; /* CSV quote char (must be 1 byte) */
116 char *escape; /* CSV escape char (must be 1 byte) */
117 List *force_quote; /* list of column names */
118 bool force_quote_all; /* FORCE QUOTE *? */
119 bool *force_quote_flags; /* per-column CSV FQ flags */
120 List *force_notnull; /* list of column names */
121 bool *force_notnull_flags; /* per-column CSV FNN flags */
123 /* these are just for error messages, see CopyFromErrorCallback */
124 const char *cur_relname; /* table name for error messages */
125 int cur_lineno; /* line number for error messages */
126 const char *cur_attname; /* current att for error messages */
127 const char *cur_attval; /* current att value for error messages */
130 * Working state for COPY TO/FROM
132 MemoryContext copycontext; /* per-copy execution context */
135 * Working state for COPY TO
137 FmgrInfo *out_functions; /* lookup info for output functions */
138 MemoryContext rowcontext; /* per-row evaluation context */
141 * Working state for COPY FROM
143 AttrNumber num_defaults;
145 FmgrInfo oid_in_function;
147 FmgrInfo *in_functions; /* array of input functions for each attrs */
148 Oid *typioparams; /* array of element types for in_functions */
149 int *defmap; /* array of default att numbers */
150 ExprState **defexprs; /* array of default att expressions */
153 * These variables are used to reduce overhead in textual COPY FROM.
155 * attribute_buf holds the separated, de-escaped text for each field of
156 * the current line. The CopyReadAttributes functions return arrays of
157 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
158 * the buffer on each cycle.
160 StringInfoData attribute_buf;
162 /* field raw data pointers found by COPY FROM */
168 * Similarly, line_buf holds the whole input line being processed. The
169 * input cycle is first to read the whole line into line_buf, convert it
170 * to server encoding there, and then extract the individual attribute
171 * fields into attribute_buf. line_buf is preserved unmodified so that we
172 * can display it in error messages if appropriate.
174 StringInfoData line_buf;
175 bool line_buf_converted; /* converted to server encoding? */
178 * Finally, raw_buf holds raw data read from the data source (file or
179 * client connection). CopyReadLine parses this data sufficiently to
180 * locate line boundaries, then transfers the data to line_buf and
181 * converts it. Note: we guarantee that there is a \0 at
182 * raw_buf[raw_buf_len].
184 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
186 int raw_buf_index; /* next byte to process */
187 int raw_buf_len; /* total # of bytes stored */
190 /* DestReceiver for COPY (SELECT) TO */
193 DestReceiver pub; /* publicly-known function pointers */
194 CopyState cstate; /* CopyStateData for the command */
195 uint64 processed; /* # of tuples processed */
200 * These macros centralize code used to process line_buf and raw_buf buffers.
201 * They are macros because they often do continue/break control and to avoid
202 * function call overhead in tight COPY loops.
204 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
205 * prevent the continue/break processing from working. We end the "if (1)"
206 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
207 * any "else" in the calling code, and to avoid any compiler warnings about
208 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
212 * This keeps the character read at the top of the loop in the buffer
213 * even if there is more than one read-ahead.
215 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
218 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
220 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
226 /* This consumes the remainder of the buffer and breaks */
227 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
230 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
233 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
234 /* backslash just before EOF, treat as data char */ \
241 * Transfer any approved data to line_buf; must do this to be sure
242 * there is some room in raw_buf.
244 #define REFILL_LINEBUF \
247 if (raw_buf_ptr > cstate->raw_buf_index) \
249 appendBinaryStringInfo(&cstate->line_buf, \
250 cstate->raw_buf + cstate->raw_buf_index, \
251 raw_buf_ptr - cstate->raw_buf_index); \
252 cstate->raw_buf_index = raw_buf_ptr; \
256 /* Undo any read-ahead and jump out of the block. */
257 #define NO_END_OF_COPY_GOTO \
260 raw_buf_ptr = prev_raw_ptr + 1; \
261 goto not_end_of_copy; \
264 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
267 /* non-export function prototypes */
268 static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
269 const char *queryString, List *attnamelist, List *options);
270 static void EndCopy(CopyState cstate);
271 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
272 const char *filename, List *attnamelist, List *options);
273 static void EndCopyTo(CopyState cstate);
274 static uint64 DoCopyTo(CopyState cstate);
275 static uint64 CopyTo(CopyState cstate);
276 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
277 Datum *values, bool *nulls);
278 static uint64 CopyFrom(CopyState cstate);
279 static bool CopyReadLine(CopyState cstate);
280 static bool CopyReadLineText(CopyState cstate);
281 static int CopyReadAttributesText(CopyState cstate);
282 static int CopyReadAttributesCSV(CopyState cstate);
283 static Datum CopyReadBinaryAttribute(CopyState cstate,
284 int column_no, FmgrInfo *flinfo,
285 Oid typioparam, int32 typmod,
287 static void CopyAttributeOutText(CopyState cstate, char *string);
288 static void CopyAttributeOutCSV(CopyState cstate, char *string,
289 bool use_quote, bool single_attr);
290 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
292 static char *limit_printout_length(const char *str);
294 /* Low-level communications functions */
295 static void SendCopyBegin(CopyState cstate);
296 static void ReceiveCopyBegin(CopyState cstate);
297 static void SendCopyEnd(CopyState cstate);
298 static void CopySendData(CopyState cstate, void *databuf, int datasize);
299 static void CopySendString(CopyState cstate, const char *str);
300 static void CopySendChar(CopyState cstate, char c);
301 static void CopySendEndOfRow(CopyState cstate);
302 static int CopyGetData(CopyState cstate, void *databuf,
303 int minread, int maxread);
304 static void CopySendInt32(CopyState cstate, int32 val);
305 static bool CopyGetInt32(CopyState cstate, int32 *val);
306 static void CopySendInt16(CopyState cstate, int16 val);
307 static bool CopyGetInt16(CopyState cstate, int16 *val);
311 * Send copy start/stop messages for frontend copies. These have changed
312 * in past protocol redesigns.
315 SendCopyBegin(CopyState cstate)
317 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
321 int natts = list_length(cstate->attnumlist);
322 int16 format = (cstate->binary ? 1 : 0);
325 pq_beginmessage(&buf, 'H');
326 pq_sendbyte(&buf, format); /* overall format */
327 pq_sendint(&buf, natts, 2);
328 for (i = 0; i < natts; i++)
329 pq_sendint(&buf, format, 2); /* per-column formats */
331 cstate->copy_dest = COPY_NEW_FE;
333 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
338 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
339 errmsg("COPY BINARY is not supported to stdout or from stdin")));
340 pq_putemptymessage('H');
341 /* grottiness needed for old COPY OUT protocol */
343 cstate->copy_dest = COPY_OLD_FE;
350 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
351 errmsg("COPY BINARY is not supported to stdout or from stdin")));
352 pq_putemptymessage('B');
353 /* grottiness needed for old COPY OUT protocol */
355 cstate->copy_dest = COPY_OLD_FE;
360 ReceiveCopyBegin(CopyState cstate)
362 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
366 int natts = list_length(cstate->attnumlist);
367 int16 format = (cstate->binary ? 1 : 0);
370 pq_beginmessage(&buf, 'G');
371 pq_sendbyte(&buf, format); /* overall format */
372 pq_sendint(&buf, natts, 2);
373 for (i = 0; i < natts; i++)
374 pq_sendint(&buf, format, 2); /* per-column formats */
376 cstate->copy_dest = COPY_NEW_FE;
377 cstate->fe_msgbuf = makeStringInfo();
379 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
384 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
385 errmsg("COPY BINARY is not supported to stdout or from stdin")));
386 pq_putemptymessage('G');
387 cstate->copy_dest = COPY_OLD_FE;
394 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
395 errmsg("COPY BINARY is not supported to stdout or from stdin")));
396 pq_putemptymessage('D');
397 cstate->copy_dest = COPY_OLD_FE;
399 /* We *must* flush here to ensure FE knows it can send. */
404 SendCopyEnd(CopyState cstate)
406 if (cstate->copy_dest == COPY_NEW_FE)
408 /* Shouldn't have any unsent data */
409 Assert(cstate->fe_msgbuf->len == 0);
410 /* Send Copy Done message */
411 pq_putemptymessage('c');
415 CopySendData(cstate, "\\.", 2);
416 /* Need to flush out the trailer (this also appends a newline) */
417 CopySendEndOfRow(cstate);
418 pq_endcopyout(false);
423 * CopySendData sends output data to the destination (file or frontend)
424 * CopySendString does the same for null-terminated strings
425 * CopySendChar does the same for single characters
426 * CopySendEndOfRow does the appropriate thing at end of each data row
427 * (data is not actually flushed except by CopySendEndOfRow)
429 * NB: no data conversion is applied by these functions
433 CopySendData(CopyState cstate, void *databuf, int datasize)
435 appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
439 CopySendString(CopyState cstate, const char *str)
441 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
445 CopySendChar(CopyState cstate, char c)
447 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
451 CopySendEndOfRow(CopyState cstate)
453 StringInfo fe_msgbuf = cstate->fe_msgbuf;
455 switch (cstate->copy_dest)
460 /* Default line termination depends on platform */
462 CopySendChar(cstate, '\n');
464 CopySendString(cstate, "\r\n");
468 (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
469 1, cstate->copy_file);
470 if (ferror(cstate->copy_file))
472 (errcode_for_file_access(),
473 errmsg("could not write to COPY file: %m")));
476 /* The FE/BE protocol uses \n as newline for all platforms */
478 CopySendChar(cstate, '\n');
480 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
482 /* no hope of recovering connection sync, so FATAL */
484 (errcode(ERRCODE_CONNECTION_FAILURE),
485 errmsg("connection lost during COPY to stdout")));
489 /* The FE/BE protocol uses \n as newline for all platforms */
491 CopySendChar(cstate, '\n');
493 /* Dump the accumulated row as one CopyData message */
494 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
498 resetStringInfo(fe_msgbuf);
502 * CopyGetData reads data from the source (file or frontend)
504 * We attempt to read at least minread, and at most maxread, bytes from
505 * the source. The actual number of bytes read is returned; if this is
506 * less than minread, EOF was detected.
508 * Note: when copying from the frontend, we expect a proper EOF mark per
509 * protocol; if the frontend simply drops the connection, we raise error.
510 * It seems unwise to allow the COPY IN to complete normally in that case.
512 * NB: no data conversion is applied here.
515 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
519 switch (cstate->copy_dest)
522 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
523 if (ferror(cstate->copy_file))
525 (errcode_for_file_access(),
526 errmsg("could not read from COPY file: %m")));
531 * We cannot read more than minread bytes (which in practice is 1)
532 * because old protocol doesn't have any clear way of separating
533 * the COPY stream from following data. This is slow, but not any
534 * slower than the code path was originally, and we don't care
535 * much anymore about the performance of old protocol.
537 if (pq_getbytes((char *) databuf, minread))
539 /* Only a \. terminator is legal EOF in old protocol */
541 (errcode(ERRCODE_CONNECTION_FAILURE),
542 errmsg("unexpected EOF on client connection")));
547 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
551 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
553 /* Try to receive another message */
557 mtype = pq_getbyte();
560 (errcode(ERRCODE_CONNECTION_FAILURE),
561 errmsg("unexpected EOF on client connection")));
562 if (pq_getmessage(cstate->fe_msgbuf, 0))
564 (errcode(ERRCODE_CONNECTION_FAILURE),
565 errmsg("unexpected EOF on client connection")));
568 case 'd': /* CopyData */
570 case 'c': /* CopyDone */
571 /* COPY IN correctly terminated by frontend */
572 cstate->fe_eof = true;
574 case 'f': /* CopyFail */
576 (errcode(ERRCODE_QUERY_CANCELED),
577 errmsg("COPY from stdin failed: %s",
578 pq_getmsgstring(cstate->fe_msgbuf))));
580 case 'H': /* Flush */
584 * Ignore Flush/Sync for the convenience of client
585 * libraries (such as libpq) that may send those
586 * without noticing that the command they just
592 (errcode(ERRCODE_PROTOCOL_VIOLATION),
593 errmsg("unexpected message type 0x%02X during COPY from stdin",
598 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
601 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
602 databuf = (void *) ((char *) databuf + avail);
614 * These functions do apply some data conversion
618 * CopySendInt32 sends an int32 in network byte order
621 CopySendInt32(CopyState cstate, int32 val)
625 buf = htonl((uint32) val);
626 CopySendData(cstate, &buf, sizeof(buf));
630 * CopyGetInt32 reads an int32 that appears in network byte order
632 * Returns true if OK, false if EOF
635 CopyGetInt32(CopyState cstate, int32 *val)
639 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
641 *val = 0; /* suppress compiler warning */
644 *val = (int32) ntohl(buf);
649 * CopySendInt16 sends an int16 in network byte order
652 CopySendInt16(CopyState cstate, int16 val)
656 buf = htons((uint16) val);
657 CopySendData(cstate, &buf, sizeof(buf));
661 * CopyGetInt16 reads an int16 that appears in network byte order
664 CopyGetInt16(CopyState cstate, int16 *val)
668 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
670 *val = 0; /* suppress compiler warning */
673 *val = (int16) ntohs(buf);
679 * CopyLoadRawBuf loads some more data into raw_buf
681 * Returns TRUE if able to obtain at least one more byte, else FALSE.
683 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
684 * down to the start of the buffer and then we load more data after that.
685 * This case is used only when a frontend multibyte character crosses a
686 * bufferload boundary.
689 CopyLoadRawBuf(CopyState cstate)
694 if (cstate->raw_buf_index < cstate->raw_buf_len)
696 /* Copy down the unprocessed data */
697 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
698 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
702 nbytes = 0; /* no data need be saved */
704 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
705 1, RAW_BUF_SIZE - nbytes);
707 cstate->raw_buf[nbytes] = '\0';
708 cstate->raw_buf_index = 0;
709 cstate->raw_buf_len = nbytes;
710 return (inbytes > 0);
715 * DoCopy executes the SQL COPY statement
717 * Either unload or reload contents of table <relation>, depending on <from>.
718 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
719 * we also support copying the output of an arbitrary SELECT query.
721 * If <pipe> is false, transfer is between the table and the file named
722 * <filename>. Otherwise, transfer is between the table and our regular
723 * input/output stream. The latter could be either stdin/stdout or a
724 * socket, depending on whether we're running under Postmaster control.
726 * Do not allow a Postgres user without superuser privilege to read from
727 * or write to a file.
729 * Do not allow the copy if user doesn't have proper permission to access
730 * the table or the specifically requested columns.
733 DoCopy(const CopyStmt *stmt, const char *queryString)
736 bool is_from = stmt->is_from;
737 bool pipe = (stmt->filename == NULL);
741 /* Disallow file COPY except to superusers. */
742 if (!pipe && !superuser())
744 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
745 errmsg("must be superuser to COPY to or from a file"),
746 errhint("Anyone can COPY to stdout or from stdin. "
747 "psql's \\copy command also works for anyone.")));
752 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
757 Assert(!stmt->query);
759 /* Open and lock the relation, using the appropriate lock type. */
760 rel = heap_openrv(stmt->relation,
761 (is_from ? RowExclusiveLock : AccessShareLock));
763 rte = makeNode(RangeTblEntry);
764 rte->rtekind = RTE_RELATION;
765 rte->relid = RelationGetRelid(rel);
766 rte->relkind = rel->rd_rel->relkind;
767 rte->requiredPerms = required_access;
769 tupDesc = RelationGetDescr(rel);
770 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
771 foreach(cur, attnums)
773 int attno = lfirst_int(cur) -
774 FirstLowInvalidHeapAttributeNumber;
777 rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
779 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
781 ExecCheckRTPerms(list_make1(rte), true);
792 /* check read-only transaction */
793 if (XactReadOnly && rel->rd_backend != MyBackendId)
794 PreventCommandIfReadOnly("COPY FROM");
796 cstate = BeginCopyFrom(rel, stmt->filename,
797 stmt->attlist, stmt->options);
798 processed = CopyFrom(cstate); /* copy from file to database */
803 cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
804 stmt->attlist, stmt->options);
805 processed = DoCopyTo(cstate); /* copy from database to file */
810 * Close the relation. If reading, we can release the AccessShareLock we
811 * got; if writing, we should hold the lock until end of transaction to
812 * ensure that updates will be committed before lock is released.
815 heap_close(rel, (is_from ? NoLock : AccessShareLock));
821 * Process the statement option list for COPY.
823 * Scan the options list (a list of DefElem) and transpose the information
824 * into cstate, applying appropriate error checking.
826 * cstate is assumed to be filled with zeroes initially.
828 * This is exported so that external users of the COPY API can sanity-check
829 * a list of options. In that usage, cstate should be passed as NULL
830 * (since external users don't know sizeof(CopyStateData)) and the collected
831 * data is just leaked until CurrentMemoryContext is reset.
833 * Note that additional checking, such as whether column names listed in FORCE
834 * QUOTE actually exist, has to be applied later. This just checks for
835 * self-consistency of the options list.
838 ProcessCopyOptions(CopyState cstate,
842 bool format_specified = false;
845 /* Support external use for option sanity checking */
847 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
849 cstate->file_encoding = -1;
851 /* Extract options from the statement node tree */
852 foreach(option, options)
854 DefElem *defel = (DefElem *) lfirst(option);
856 if (strcmp(defel->defname, "format") == 0)
858 char *fmt = defGetString(defel);
860 if (format_specified)
862 (errcode(ERRCODE_SYNTAX_ERROR),
863 errmsg("conflicting or redundant options")));
864 format_specified = true;
865 if (strcmp(fmt, "text") == 0)
866 /* default format */ ;
867 else if (strcmp(fmt, "csv") == 0)
868 cstate->csv_mode = true;
869 else if (strcmp(fmt, "binary") == 0)
870 cstate->binary = true;
873 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
874 errmsg("COPY format \"%s\" not recognized", fmt)));
876 else if (strcmp(defel->defname, "oids") == 0)
880 (errcode(ERRCODE_SYNTAX_ERROR),
881 errmsg("conflicting or redundant options")));
882 cstate->oids = defGetBoolean(defel);
884 else if (strcmp(defel->defname, "delimiter") == 0)
888 (errcode(ERRCODE_SYNTAX_ERROR),
889 errmsg("conflicting or redundant options")));
890 cstate->delim = defGetString(defel);
892 else if (strcmp(defel->defname, "null") == 0)
894 if (cstate->null_print)
896 (errcode(ERRCODE_SYNTAX_ERROR),
897 errmsg("conflicting or redundant options")));
898 cstate->null_print = defGetString(defel);
900 else if (strcmp(defel->defname, "header") == 0)
902 if (cstate->header_line)
904 (errcode(ERRCODE_SYNTAX_ERROR),
905 errmsg("conflicting or redundant options")));
906 cstate->header_line = defGetBoolean(defel);
908 else if (strcmp(defel->defname, "quote") == 0)
912 (errcode(ERRCODE_SYNTAX_ERROR),
913 errmsg("conflicting or redundant options")));
914 cstate->quote = defGetString(defel);
916 else if (strcmp(defel->defname, "escape") == 0)
920 (errcode(ERRCODE_SYNTAX_ERROR),
921 errmsg("conflicting or redundant options")));
922 cstate->escape = defGetString(defel);
924 else if (strcmp(defel->defname, "force_quote") == 0)
926 if (cstate->force_quote || cstate->force_quote_all)
928 (errcode(ERRCODE_SYNTAX_ERROR),
929 errmsg("conflicting or redundant options")));
930 if (defel->arg && IsA(defel->arg, A_Star))
931 cstate->force_quote_all = true;
932 else if (defel->arg && IsA(defel->arg, List))
933 cstate->force_quote = (List *) defel->arg;
936 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
937 errmsg("argument to option \"%s\" must be a list of column names",
940 else if (strcmp(defel->defname, "force_not_null") == 0)
942 if (cstate->force_notnull)
944 (errcode(ERRCODE_SYNTAX_ERROR),
945 errmsg("conflicting or redundant options")));
946 if (defel->arg && IsA(defel->arg, List))
947 cstate->force_notnull = (List *) defel->arg;
950 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
951 errmsg("argument to option \"%s\" must be a list of column names",
954 else if (strcmp(defel->defname, "encoding") == 0)
956 if (cstate->file_encoding >= 0)
958 (errcode(ERRCODE_SYNTAX_ERROR),
959 errmsg("conflicting or redundant options")));
960 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
961 if (cstate->file_encoding < 0)
963 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
964 errmsg("argument to option \"%s\" must be a valid encoding name",
969 (errcode(ERRCODE_SYNTAX_ERROR),
970 errmsg("option \"%s\" not recognized",
975 * Check for incompatible options (must do these two before inserting
978 if (cstate->binary && cstate->delim)
980 (errcode(ERRCODE_SYNTAX_ERROR),
981 errmsg("cannot specify DELIMITER in BINARY mode")));
983 if (cstate->binary && cstate->null_print)
985 (errcode(ERRCODE_SYNTAX_ERROR),
986 errmsg("cannot specify NULL in BINARY mode")));
988 /* Set defaults for omitted options */
990 cstate->delim = cstate->csv_mode ? "," : "\t";
992 if (!cstate->null_print)
993 cstate->null_print = cstate->csv_mode ? "" : "\\N";
994 cstate->null_print_len = strlen(cstate->null_print);
996 if (cstate->csv_mode)
999 cstate->quote = "\"";
1000 if (!cstate->escape)
1001 cstate->escape = cstate->quote;
1004 /* Only single-byte delimiter strings are supported. */
1005 if (strlen(cstate->delim) != 1)
1007 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1008 errmsg("COPY delimiter must be a single one-byte character")));
1010 /* Disallow end-of-line characters */
1011 if (strchr(cstate->delim, '\r') != NULL ||
1012 strchr(cstate->delim, '\n') != NULL)
1014 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1015 errmsg("COPY delimiter cannot be newline or carriage return")));
1017 if (strchr(cstate->null_print, '\r') != NULL ||
1018 strchr(cstate->null_print, '\n') != NULL)
1020 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1021 errmsg("COPY null representation cannot use newline or carriage return")));
1024 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1025 * backslash because it would be ambiguous. We can't allow the other
1026 * cases because data characters matching the delimiter must be
1027 * backslashed, and certain backslash combinations are interpreted
1028 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1029 * more than strictly necessary, but seems best for consistency and
1030 * future-proofing. Likewise we disallow all digits though only octal
1031 * digits are actually dangerous.
1033 if (!cstate->csv_mode &&
1034 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1035 cstate->delim[0]) != NULL)
1037 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1038 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1041 if (!cstate->csv_mode && cstate->header_line)
1043 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1044 errmsg("COPY HEADER available only in CSV mode")));
1047 if (!cstate->csv_mode && cstate->quote != NULL)
1049 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1050 errmsg("COPY quote available only in CSV mode")));
1052 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1054 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1055 errmsg("COPY quote must be a single one-byte character")));
1057 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1059 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1060 errmsg("COPY delimiter and quote must be different")));
1063 if (!cstate->csv_mode && cstate->escape != NULL)
1065 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1066 errmsg("COPY escape available only in CSV mode")));
1068 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1070 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1071 errmsg("COPY escape must be a single one-byte character")));
1073 /* Check force_quote */
1074 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1076 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1077 errmsg("COPY force quote available only in CSV mode")));
1078 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1080 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1081 errmsg("COPY force quote only available using COPY TO")));
1083 /* Check force_notnull */
1084 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1086 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1087 errmsg("COPY force not null available only in CSV mode")));
1088 if (cstate->force_notnull != NIL && !is_from)
1090 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1091 errmsg("COPY force not null only available using COPY FROM")));
1093 /* Don't allow the delimiter to appear in the null string. */
1094 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1096 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1097 errmsg("COPY delimiter must not appear in the NULL specification")));
1099 /* Don't allow the CSV quote char to appear in the null string. */
1100 if (cstate->csv_mode &&
1101 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1103 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1104 errmsg("CSV quote character must not appear in the NULL specification")));
1108 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1110 * Iff <binary>, unload or reload in the binary format, as opposed to the
1111 * more wasteful but more robust and portable text format.
1113 * Iff <oids>, unload or reload the format that includes OID information.
1114 * On input, we accept OIDs whether or not the table has an OID column,
1115 * but silently drop them if it does not. On output, we report an error
1116 * if the user asks for OIDs in a table that has none (not providing an
1117 * OID column might seem friendlier, but could seriously confuse programs).
1119 * If in the text format, delimit columns with delimiter <delim> and print
1120 * NULL values as <null_print>.
1123 BeginCopy(bool is_from,
1126 const char *queryString,
1133 MemoryContext oldcontext;
1135 /* Allocate workspace and zero all fields */
1136 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1139 * We allocate everything used by a cstate in a new memory context. This
1140 * avoids memory leaks during repeated use of COPY in a query.
1142 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1144 ALLOCSET_DEFAULT_MINSIZE,
1145 ALLOCSET_DEFAULT_INITSIZE,
1146 ALLOCSET_DEFAULT_MAXSIZE);
1148 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1150 /* Extract options from the statement node tree */
1151 ProcessCopyOptions(cstate, is_from, options);
1153 /* Process the source/target relation or query */
1160 tupDesc = RelationGetDescr(cstate->rel);
1162 /* Don't allow COPY w/ OIDs to or from a table without them */
1163 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1165 (errcode(ERRCODE_UNDEFINED_COLUMN),
1166 errmsg("table \"%s\" does not have OIDs",
1167 RelationGetRelationName(cstate->rel))));
1179 /* Don't allow COPY w/ OIDs from a select */
1182 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1183 errmsg("COPY (SELECT) WITH OIDS is not supported")));
1186 * Run parse analysis and rewrite. Note this also acquires sufficient
1187 * locks on the source table(s).
1189 * Because the parser and planner tend to scribble on their input, we
1190 * make a preliminary copy of the source querytree. This prevents
1191 * problems in the case that the COPY is in a portal or plpgsql
1192 * function and is executed repeatedly. (See also the same hack in
1193 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1195 rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
1196 queryString, NULL, 0);
1198 /* We don't expect more or less than one result query */
1199 if (list_length(rewritten) != 1)
1200 elog(ERROR, "unexpected rewrite result");
1202 query = (Query *) linitial(rewritten);
1203 Assert(query->commandType == CMD_SELECT);
1204 Assert(query->utilityStmt == NULL);
1206 /* Query mustn't use INTO, either */
1207 if (query->intoClause)
1209 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1210 errmsg("COPY (SELECT INTO) is not supported")));
1212 /* plan the query */
1213 plan = planner(query, 0, NULL);
1216 * Use a snapshot with an updated command ID to ensure this query sees
1217 * results of any previously executed queries.
1219 PushCopiedSnapshot(GetActiveSnapshot());
1220 UpdateActiveSnapshotCommandId();
1222 /* Create dest receiver for COPY OUT */
1223 dest = CreateDestReceiver(DestCopyOut);
1224 ((DR_copy *) dest)->cstate = cstate;
1226 /* Create a QueryDesc requesting no output */
1227 cstate->queryDesc = CreateQueryDesc(plan, queryString,
1228 GetActiveSnapshot(),
1233 * Call ExecutorStart to prepare the plan for execution.
1235 * ExecutorStart computes a result tupdesc for us
1237 ExecutorStart(cstate->queryDesc, 0);
1239 tupDesc = cstate->queryDesc->tupDesc;
1242 /* Generate or convert list of attributes to process */
1243 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1245 num_phys_attrs = tupDesc->natts;
1247 /* Convert FORCE QUOTE name list to per-column flags, check validity */
1248 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1249 if (cstate->force_quote_all)
1253 for (i = 0; i < num_phys_attrs; i++)
1254 cstate->force_quote_flags[i] = true;
1256 else if (cstate->force_quote)
1261 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1263 foreach(cur, attnums)
1265 int attnum = lfirst_int(cur);
1267 if (!list_member_int(cstate->attnumlist, attnum))
1269 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1270 errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
1271 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1272 cstate->force_quote_flags[attnum - 1] = true;
1276 /* Convert FORCE NOT NULL name list to per-column flags, check validity */
1277 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1278 if (cstate->force_notnull)
1283 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1285 foreach(cur, attnums)
1287 int attnum = lfirst_int(cur);
1289 if (!list_member_int(cstate->attnumlist, attnum))
1291 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1292 errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
1293 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1294 cstate->force_notnull_flags[attnum - 1] = true;
1298 /* Use client encoding when ENCODING option is not specified. */
1299 if (cstate->file_encoding < 0)
1300 cstate->file_encoding = pg_get_client_encoding();
1303 * Set up encoding conversion info. Even if the file and server encodings
1304 * are the same, we must apply pg_any_to_server() to validate data in
1305 * multibyte encodings.
1307 cstate->need_transcoding =
1308 (cstate->file_encoding != GetDatabaseEncoding() ||
1309 pg_database_encoding_max_length() > 1);
1310 /* See Multibyte encoding comment above */
1311 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1313 cstate->copy_dest = COPY_FILE; /* default */
1315 MemoryContextSwitchTo(oldcontext);
1321 * Release resources allocated in a cstate for COPY TO/FROM.
1324 EndCopy(CopyState cstate)
1326 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1328 (errcode_for_file_access(),
1329 errmsg("could not close file \"%s\": %m",
1330 cstate->filename)));
1332 MemoryContextDelete(cstate->copycontext);
1337 * Setup CopyState to read tuples from a table or a query for COPY TO.
1340 BeginCopyTo(Relation rel,
1342 const char *queryString,
1343 const char *filename,
1348 bool pipe = (filename == NULL);
1349 MemoryContext oldcontext;
1351 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1353 if (rel->rd_rel->relkind == RELKIND_VIEW)
1355 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1356 errmsg("cannot copy from view \"%s\"",
1357 RelationGetRelationName(rel)),
1358 errhint("Try the COPY (SELECT ...) TO variant.")));
1359 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1361 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1362 errmsg("cannot copy from foreign table \"%s\"",
1363 RelationGetRelationName(rel)),
1364 errhint("Try the COPY (SELECT ...) TO variant.")));
1365 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1367 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1368 errmsg("cannot copy from sequence \"%s\"",
1369 RelationGetRelationName(rel))));
1372 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1373 errmsg("cannot copy from non-table relation \"%s\"",
1374 RelationGetRelationName(rel))));
1377 cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
1378 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1382 if (whereToSendOutput != DestRemote)
1383 cstate->copy_file = stdout;
1387 mode_t oumask; /* Pre-existing umask value */
1391 * Prevent write to relative path ... too easy to shoot oneself in the
1392 * foot by overwriting a database file ...
1394 if (!is_absolute_path(filename))
1396 (errcode(ERRCODE_INVALID_NAME),
1397 errmsg("relative path not allowed for COPY to file")));
1399 cstate->filename = pstrdup(filename);
1400 oumask = umask(S_IWGRP | S_IWOTH);
1401 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1404 if (cstate->copy_file == NULL)
1406 (errcode_for_file_access(),
1407 errmsg("could not open file \"%s\" for writing: %m",
1408 cstate->filename)));
1410 fstat(fileno(cstate->copy_file), &st);
1411 if (S_ISDIR(st.st_mode))
1413 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1414 errmsg("\"%s\" is a directory", cstate->filename)));
1417 MemoryContextSwitchTo(oldcontext);
1423 * This intermediate routine exists mainly to localize the effects of setjmp
1424 * so we don't need to plaster a lot of variables with "volatile".
1427 DoCopyTo(CopyState cstate)
1429 bool pipe = (cstate->filename == NULL);
1430 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1436 SendCopyBegin(cstate);
1438 processed = CopyTo(cstate);
1441 SendCopyEnd(cstate);
1446 * Make sure we turn off old-style COPY OUT mode upon error. It is
1447 * okay to do this in all cases, since it does nothing if the mode is
1450 pq_endcopyout(true);
1459 * Clean up storage and release resources for COPY TO.
1462 EndCopyTo(CopyState cstate)
1464 if (cstate->queryDesc != NULL)
1466 /* Close down the query and free resources. */
1467 ExecutorFinish(cstate->queryDesc);
1468 ExecutorEnd(cstate->queryDesc);
1469 FreeQueryDesc(cstate->queryDesc);
1470 PopActiveSnapshot();
1473 /* Clean up storage */
1478 * Copy from relation or query TO file.
1481 CopyTo(CopyState cstate)
1485 Form_pg_attribute *attr;
1490 tupDesc = RelationGetDescr(cstate->rel);
1492 tupDesc = cstate->queryDesc->tupDesc;
1493 attr = tupDesc->attrs;
1494 num_phys_attrs = tupDesc->natts;
1495 cstate->null_print_client = cstate->null_print; /* default */
1497 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1498 cstate->fe_msgbuf = makeStringInfo();
1500 /* Get info about the columns we need to process. */
1501 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1502 foreach(cur, cstate->attnumlist)
1504 int attnum = lfirst_int(cur);
1509 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1513 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1516 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1520 * Create a temporary memory context that we can reset once per row to
1521 * recover palloc'd memory. This avoids any problems with leaks inside
1522 * datatype output routines, and should be faster than retail pfree's
1523 * anyway. (We don't need a whole econtext as CopyFrom does.)
1525 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1527 ALLOCSET_DEFAULT_MINSIZE,
1528 ALLOCSET_DEFAULT_INITSIZE,
1529 ALLOCSET_DEFAULT_MAXSIZE);
1533 /* Generate header for a binary copy */
1537 CopySendData(cstate, (char *) BinarySignature, 11);
1542 CopySendInt32(cstate, tmp);
1543 /* No header extension */
1545 CopySendInt32(cstate, tmp);
1550 * For non-binary copy, we need to convert null_print to file
1551 * encoding, because it will be sent directly with CopySendString.
1553 if (cstate->need_transcoding)
1554 cstate->null_print_client = pg_server_to_any(cstate->null_print,
1555 cstate->null_print_len,
1556 cstate->file_encoding);
1558 /* if a header has been requested send the line */
1559 if (cstate->header_line)
1561 bool hdr_delim = false;
1563 foreach(cur, cstate->attnumlist)
1565 int attnum = lfirst_int(cur);
1569 CopySendChar(cstate, cstate->delim[0]);
1572 colname = NameStr(attr[attnum - 1]->attname);
1574 CopyAttributeOutCSV(cstate, colname, false,
1575 list_length(cstate->attnumlist) == 1);
1578 CopySendEndOfRow(cstate);
1586 HeapScanDesc scandesc;
1589 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1590 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1592 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
1595 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
1597 CHECK_FOR_INTERRUPTS();
1599 /* Deconstruct the tuple ... faster than repeated heap_getattr */
1600 heap_deform_tuple(tuple, tupDesc, values, nulls);
1602 /* Format and send the data */
1603 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
1607 heap_endscan(scandesc);
1614 /* run the plan --- the dest receiver will send tuples */
1615 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
1616 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1621 /* Generate trailer for a binary copy */
1622 CopySendInt16(cstate, -1);
1623 /* Need to flush out the trailer */
1624 CopySendEndOfRow(cstate);
1627 MemoryContextDelete(cstate->rowcontext);
1633 * Emit one row during CopyTo().
1636 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
1638 bool need_delim = false;
1639 FmgrInfo *out_functions = cstate->out_functions;
1640 MemoryContext oldcontext;
1644 MemoryContextReset(cstate->rowcontext);
1645 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1649 /* Binary per-tuple header */
1650 CopySendInt16(cstate, list_length(cstate->attnumlist));
1651 /* Send OID if wanted --- note attnumlist doesn't include it */
1654 /* Hack --- assume Oid is same size as int32 */
1655 CopySendInt32(cstate, sizeof(int32));
1656 CopySendInt32(cstate, tupleOid);
1661 /* Text format has no per-tuple header, but send OID if wanted */
1662 /* Assume digits don't need any quoting or encoding conversion */
1665 string = DatumGetCString(DirectFunctionCall1(oidout,
1666 ObjectIdGetDatum(tupleOid)));
1667 CopySendString(cstate, string);
1672 foreach(cur, cstate->attnumlist)
1674 int attnum = lfirst_int(cur);
1675 Datum value = values[attnum - 1];
1676 bool isnull = nulls[attnum - 1];
1678 if (!cstate->binary)
1681 CopySendChar(cstate, cstate->delim[0]);
1687 if (!cstate->binary)
1688 CopySendString(cstate, cstate->null_print_client);
1690 CopySendInt32(cstate, -1);
1694 if (!cstate->binary)
1696 string = OutputFunctionCall(&out_functions[attnum - 1],
1698 if (cstate->csv_mode)
1699 CopyAttributeOutCSV(cstate, string,
1700 cstate->force_quote_flags[attnum - 1],
1701 list_length(cstate->attnumlist) == 1);
1703 CopyAttributeOutText(cstate, string);
1709 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
1711 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
1712 CopySendData(cstate, VARDATA(outputbytes),
1713 VARSIZE(outputbytes) - VARHDRSZ);
1718 CopySendEndOfRow(cstate);
1720 MemoryContextSwitchTo(oldcontext);
1725 * error context callback for COPY FROM
1727 * The argument for the error context must be CopyState.
1730 CopyFromErrorCallback(void *arg)
1732 CopyState cstate = (CopyState) arg;
1736 /* can't usefully display the data */
1737 if (cstate->cur_attname)
1738 errcontext("COPY %s, line %d, column %s",
1739 cstate->cur_relname, cstate->cur_lineno,
1740 cstate->cur_attname);
1742 errcontext("COPY %s, line %d",
1743 cstate->cur_relname, cstate->cur_lineno);
1747 if (cstate->cur_attname && cstate->cur_attval)
1749 /* error is relevant to a particular column */
1752 attval = limit_printout_length(cstate->cur_attval);
1753 errcontext("COPY %s, line %d, column %s: \"%s\"",
1754 cstate->cur_relname, cstate->cur_lineno,
1755 cstate->cur_attname, attval);
1758 else if (cstate->cur_attname)
1760 /* error is relevant to a particular column, value is NULL */
1761 errcontext("COPY %s, line %d, column %s: null input",
1762 cstate->cur_relname, cstate->cur_lineno,
1763 cstate->cur_attname);
1767 /* error is relevant to a particular line */
1768 if (cstate->line_buf_converted || !cstate->need_transcoding)
1772 lineval = limit_printout_length(cstate->line_buf.data);
1773 errcontext("COPY %s, line %d: \"%s\"",
1774 cstate->cur_relname, cstate->cur_lineno, lineval);
1780 * Here, the line buffer is still in a foreign encoding, and
1781 * indeed it's quite likely that the error is precisely a
1782 * failure to do encoding conversion (ie, bad data). We dare
1783 * not try to convert it, and at present there's no way to
1784 * regurgitate it without conversion. So we have to punt and
1785 * just report the line number.
1787 errcontext("COPY %s, line %d",
1788 cstate->cur_relname, cstate->cur_lineno);
1795 * Make sure we don't print an unreasonable amount of COPY data in a message.
1797 * It would seem a lot easier to just use the sprintf "precision" limit to
1798 * truncate the string. However, some versions of glibc have a bug/misfeature
1799 * that vsnprintf will always fail (return -1) if it is asked to truncate
1800 * a string that contains invalid byte sequences for the current encoding.
1801 * So, do our own truncation. We return a pstrdup'd copy of the input.
1804 limit_printout_length(const char *str)
1806 #define MAX_COPY_DATA_DISPLAY 100
1808 int slen = strlen(str);
1812 /* Fast path if definitely okay */
1813 if (slen <= MAX_COPY_DATA_DISPLAY)
1814 return pstrdup(str);
1816 /* Apply encoding-dependent truncation */
1817 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
1820 * Truncate, and add "..." to show we truncated the input.
1822 res = (char *) palloc(len + 4);
1823 memcpy(res, str, len);
1824 strcpy(res + len, "...");
1830 * Copy FROM file to relation.
1833 CopyFrom(CopyState cstate)
1839 ResultRelInfo *resultRelInfo;
1840 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
1841 ExprContext *econtext;
1842 TupleTableSlot *myslot;
1843 MemoryContext oldcontext = CurrentMemoryContext;
1844 ErrorContextCallback errcontext;
1845 CommandId mycid = GetCurrentCommandId(true);
1846 int hi_options = 0; /* start with default heap_insert options */
1847 BulkInsertState bistate;
1848 uint64 processed = 0;
1850 Assert(cstate->rel);
1852 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1854 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1856 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1857 errmsg("cannot copy to view \"%s\"",
1858 RelationGetRelationName(cstate->rel))));
1859 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1861 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1862 errmsg("cannot copy to foreign table \"%s\"",
1863 RelationGetRelationName(cstate->rel))));
1864 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1866 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1867 errmsg("cannot copy to sequence \"%s\"",
1868 RelationGetRelationName(cstate->rel))));
1871 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1872 errmsg("cannot copy to non-table relation \"%s\"",
1873 RelationGetRelationName(cstate->rel))));
1876 tupDesc = RelationGetDescr(cstate->rel);
1879 * Check to see if we can avoid writing WAL
1881 * If archive logging/streaming is not enabled *and* either
1882 * - table was created in same transaction as this COPY
1883 * - data is being written to relfilenode created in this transaction
1884 * then we can skip writing WAL. It's safe because if the transaction
1885 * doesn't commit, we'll discard the table (or the new relfilenode file).
1886 * If it does commit, we'll have done the heap_sync at the bottom of this
1889 * As mentioned in comments in utils/rel.h, the in-same-transaction test
1890 * is not completely reliable, since in rare cases rd_createSubid or
1891 * rd_newRelfilenodeSubid can be cleared before the end of the transaction.
1892 * However this is OK since at worst we will fail to make the optimization.
1894 * Also, if the target file is new-in-transaction, we assume that checking
1895 * FSM for free space is a waste of time, even if we must use WAL because
1896 * of archiving. This could possibly be wrong, but it's unlikely.
1898 * The comments for heap_insert and RelationGetBufferForTuple specify that
1899 * skipping WAL logging is only safe if we ensure that our tuples do not
1900 * go into pages containing tuples from any other transactions --- but this
1901 * must be the case if we have a new table or new relfilenode, so we need
1902 * no additional work to enforce that.
1905 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
1906 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1908 hi_options |= HEAP_INSERT_SKIP_FSM;
1909 if (!XLogIsNeeded())
1910 hi_options |= HEAP_INSERT_SKIP_WAL;
1914 * We need a ResultRelInfo so we can use the regular executor's
1915 * index-entry-making machinery. (There used to be a huge amount of code
1916 * here that basically duplicated execUtils.c ...)
1918 resultRelInfo = makeNode(ResultRelInfo);
1919 resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
1920 resultRelInfo->ri_RelationDesc = cstate->rel;
1921 resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
1922 if (resultRelInfo->ri_TrigDesc)
1924 resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
1925 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
1926 resultRelInfo->ri_TrigWhenExprs = (List **)
1927 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(List *));
1929 resultRelInfo->ri_TrigInstrument = NULL;
1931 ExecOpenIndices(resultRelInfo);
1933 estate->es_result_relations = resultRelInfo;
1934 estate->es_num_result_relations = 1;
1935 estate->es_result_relation_info = resultRelInfo;
1937 /* Set up a tuple slot too */
1938 myslot = ExecInitExtraTupleSlot(estate);
1939 ExecSetSlotDescriptor(myslot, tupDesc);
1940 /* Triggers might need a slot as well */
1941 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
1943 /* Prepare to catch AFTER triggers. */
1944 AfterTriggerBeginQuery();
1947 * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
1948 * should do this for COPY, since it's not really an "INSERT" statement as
1949 * such. However, executing these triggers maintains consistency with the
1950 * EACH ROW triggers that we already fire on COPY.
1952 ExecBSInsertTriggers(estate, resultRelInfo);
1954 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1955 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1957 bistate = GetBulkInsertState();
1958 econtext = GetPerTupleExprContext(estate);
1960 /* Set up callback to identify error line number */
1961 errcontext.callback = CopyFromErrorCallback;
1962 errcontext.arg = (void *) cstate;
1963 errcontext.previous = error_context_stack;
1964 error_context_stack = &errcontext;
1968 TupleTableSlot *slot;
1970 Oid loaded_oid = InvalidOid;
1972 CHECK_FOR_INTERRUPTS();
1974 /* Reset the per-tuple exprcontext */
1975 ResetPerTupleExprContext(estate);
1977 /* Switch into its memory context */
1978 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1980 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
1983 /* And now we can form the input tuple. */
1984 tuple = heap_form_tuple(tupDesc, values, nulls);
1986 if (loaded_oid != InvalidOid)
1987 HeapTupleSetOid(tuple, loaded_oid);
1989 /* Triggers and stuff need to be invoked in query context. */
1990 MemoryContextSwitchTo(oldcontext);
1992 /* Place tuple in tuple slot --- but slot shouldn't free it */
1994 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
1998 /* BEFORE ROW INSERT Triggers */
1999 if (resultRelInfo->ri_TrigDesc &&
2000 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2002 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2004 if (slot == NULL) /* "do nothing" */
2006 else /* trigger might have changed tuple */
2007 tuple = ExecMaterializeSlot(slot);
2012 List *recheckIndexes = NIL;
2014 /* Check the constraints of the tuple */
2015 if (cstate->rel->rd_att->constr)
2016 ExecConstraints(resultRelInfo, slot, estate);
2018 /* OK, store the tuple and create index entries for it */
2019 heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
2021 if (resultRelInfo->ri_NumIndices > 0)
2022 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
2025 /* AFTER ROW INSERT Triggers */
2026 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2029 list_free(recheckIndexes);
2032 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2033 * this is the same definition used by execMain.c for counting
2034 * tuples inserted by an INSERT command.
2040 /* Done, clean up */
2041 error_context_stack = errcontext.previous;
2043 FreeBulkInsertState(bistate);
2045 MemoryContextSwitchTo(oldcontext);
2047 /* Execute AFTER STATEMENT insertion triggers */
2048 ExecASInsertTriggers(estate, resultRelInfo);
2050 /* Handle queued AFTER triggers */
2051 AfterTriggerEndQuery(estate);
2056 ExecResetTupleTable(estate->es_tupleTable, false);
2058 ExecCloseIndices(resultRelInfo);
2060 FreeExecutorState(estate);
2063 * If we skipped writing WAL, then we need to sync the heap (but not
2064 * indexes since those use WAL anyway)
2066 if (hi_options & HEAP_INSERT_SKIP_WAL)
2067 heap_sync(cstate->rel);
2073 * Setup to read tuples from a file for COPY FROM.
2075 * 'rel': Used as a template for the tuples
2076 * 'filename': Name of server-local file to read
2077 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2078 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2080 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2083 BeginCopyFrom(Relation rel,
2084 const char *filename,
2089 bool pipe = (filename == NULL);
2091 Form_pg_attribute *attr;
2092 AttrNumber num_phys_attrs,
2094 FmgrInfo *in_functions;
2099 ExprState **defexprs;
2100 MemoryContext oldcontext;
2102 cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
2103 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2105 /* Initialize state variables */
2106 cstate->fe_eof = false;
2107 cstate->eol_type = EOL_UNKNOWN;
2108 cstate->cur_relname = RelationGetRelationName(cstate->rel);
2109 cstate->cur_lineno = 0;
2110 cstate->cur_attname = NULL;
2111 cstate->cur_attval = NULL;
2113 /* Set up variables to avoid per-attribute overhead. */
2114 initStringInfo(&cstate->attribute_buf);
2115 initStringInfo(&cstate->line_buf);
2116 cstate->line_buf_converted = false;
2117 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2118 cstate->raw_buf_index = cstate->raw_buf_len = 0;
2120 tupDesc = RelationGetDescr(cstate->rel);
2121 attr = tupDesc->attrs;
2122 num_phys_attrs = tupDesc->natts;
2126 * Pick up the required catalog information for each attribute in the
2127 * relation, including the input function, the element type (to pass to
2128 * the input function), and info about defaults and constraints. (Which
2129 * input function we use depends on text/binary format choice.)
2131 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2132 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2133 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2134 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2136 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2138 /* We don't need info for dropped attributes */
2139 if (attr[attnum - 1]->attisdropped)
2142 /* Fetch the input function and typioparam info */
2144 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2145 &in_func_oid, &typioparams[attnum - 1]);
2147 getTypeInputInfo(attr[attnum - 1]->atttypid,
2148 &in_func_oid, &typioparams[attnum - 1]);
2149 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2151 /* Get default info if needed */
2152 if (!list_member_int(cstate->attnumlist, attnum))
2154 /* attribute is NOT to be copied from input */
2155 /* use default value if one exists */
2156 Node *defexpr = build_column_default(cstate->rel, attnum);
2158 if (defexpr != NULL)
2160 /* Initialize expressions in copycontext. */
2161 defexprs[num_defaults] = ExecInitExpr(
2162 expression_planner((Expr *) defexpr), NULL);
2163 defmap[num_defaults] = attnum - 1;
2169 /* We keep those variables in cstate. */
2170 cstate->in_functions = in_functions;
2171 cstate->typioparams = typioparams;
2172 cstate->defmap = defmap;
2173 cstate->defexprs = defexprs;
2174 cstate->num_defaults = num_defaults;
2178 if (whereToSendOutput == DestRemote)
2179 ReceiveCopyBegin(cstate);
2181 cstate->copy_file = stdin;
2187 cstate->filename = pstrdup(filename);
2188 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
2190 if (cstate->copy_file == NULL)
2192 (errcode_for_file_access(),
2193 errmsg("could not open file \"%s\" for reading: %m",
2194 cstate->filename)));
2196 fstat(fileno(cstate->copy_file), &st);
2197 if (S_ISDIR(st.st_mode))
2199 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2200 errmsg("\"%s\" is a directory", cstate->filename)));
2203 if (!cstate->binary)
2205 /* must rely on user to tell us... */
2206 cstate->file_has_oids = cstate->oids;
2210 /* Read and verify binary header */
2215 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
2216 memcmp(readSig, BinarySignature, 11) != 0)
2218 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2219 errmsg("COPY file signature not recognized")));
2221 if (!CopyGetInt32(cstate, &tmp))
2223 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2224 errmsg("invalid COPY file header (missing flags)")));
2225 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
2227 if ((tmp >> 16) != 0)
2229 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2230 errmsg("unrecognized critical flags in COPY file header")));
2231 /* Header extension length */
2232 if (!CopyGetInt32(cstate, &tmp) ||
2235 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2236 errmsg("invalid COPY file header (missing length)")));
2237 /* Skip extension header, if present */
2240 if (CopyGetData(cstate, readSig, 1, 1) != 1)
2242 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2243 errmsg("invalid COPY file header (wrong length)")));
2247 if (cstate->file_has_oids && cstate->binary)
2249 getTypeBinaryInputInfo(OIDOID,
2250 &in_func_oid, &cstate->oid_typioparam);
2251 fmgr_info(in_func_oid, &cstate->oid_in_function);
2254 /* create workspace for CopyReadAttributes results */
2255 if (!cstate->binary)
2257 AttrNumber attr_count = list_length(cstate->attnumlist);
2258 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
2260 cstate->max_fields = nfields;
2261 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
2264 MemoryContextSwitchTo(oldcontext);
2270 * Read raw fields in the next line for COPY FROM in text or csv mode.
2271 * Return false if no more lines.
2273 * An internal temporary buffer is returned via 'fields'. It is valid until
2274 * the next call of the function. Since the function returns all raw fields
2275 * in the input file, 'nfields' could be different from the number of columns
2278 * NOTE: force_not_null option are not applied to the returned fields.
2281 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
2286 /* only available for text or csv input */
2287 Assert(!cstate->binary);
2289 /* on input just throw the header line away */
2290 if (cstate->cur_lineno == 0 && cstate->header_line)
2292 cstate->cur_lineno++;
2293 if (CopyReadLine(cstate))
2294 return false; /* done */
2297 cstate->cur_lineno++;
2299 /* Actually read the line into memory here */
2300 done = CopyReadLine(cstate);
2303 * EOF at start of line means we're done. If we see EOF after some
2304 * characters, we act as though it was newline followed by EOF, ie,
2305 * process the line and then exit loop on next iteration.
2307 if (done && cstate->line_buf.len == 0)
2310 /* Parse the line into de-escaped field values */
2311 if (cstate->csv_mode)
2312 fldct = CopyReadAttributesCSV(cstate);
2314 fldct = CopyReadAttributesText(cstate);
2316 *fields = cstate->raw_fields;
2322 * Read next tuple from file for COPY FROM. Return false if no more tuples.
2324 * 'econtext' is used to evaluate default expression for each columns not
2325 * read from the file. It can be NULL when no default values are used, i.e.
2326 * when all columns are read from the file.
2328 * 'values' and 'nulls' arrays must be the same length as columns of the
2329 * relation passed to BeginCopyFrom. This function fills the arrays.
2330 * Oid of the tuple is returned with 'tupleOid' separately.
2333 NextCopyFrom(CopyState cstate, ExprContext *econtext,
2334 Datum *values, bool *nulls, Oid *tupleOid)
2337 Form_pg_attribute *attr;
2338 AttrNumber num_phys_attrs,
2340 num_defaults = cstate->num_defaults;
2341 FmgrInfo *in_functions = cstate->in_functions;
2342 Oid *typioparams = cstate->typioparams;
2346 bool file_has_oids = cstate->file_has_oids;
2347 int *defmap = cstate->defmap;
2348 ExprState **defexprs = cstate->defexprs;
2350 tupDesc = RelationGetDescr(cstate->rel);
2351 attr = tupDesc->attrs;
2352 num_phys_attrs = tupDesc->natts;
2353 attr_count = list_length(cstate->attnumlist);
2354 nfields = file_has_oids ? (attr_count + 1) : attr_count;
2356 /* Initialize all values for row to NULL */
2357 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
2358 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
2360 if (!cstate->binary)
2362 char **field_strings;
2368 /* read raw fields in the next line */
2369 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
2372 /* check for overflowing fields */
2373 if (nfields > 0 && fldct > nfields)
2375 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2376 errmsg("extra data after last expected column")));
2380 /* Read the OID field if present */
2383 if (fieldno >= fldct)
2385 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2386 errmsg("missing data for OID column")));
2387 string = field_strings[fieldno++];
2391 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2392 errmsg("null OID in COPY data")));
2393 else if (cstate->oids && tupleOid != NULL)
2395 cstate->cur_attname = "oid";
2396 cstate->cur_attval = string;
2397 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
2398 CStringGetDatum(string)));
2399 if (*tupleOid == InvalidOid)
2401 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2402 errmsg("invalid OID in COPY data")));
2403 cstate->cur_attname = NULL;
2404 cstate->cur_attval = NULL;
2408 /* Loop to read the user attributes on the line. */
2409 foreach(cur, cstate->attnumlist)
2411 int attnum = lfirst_int(cur);
2414 if (fieldno >= fldct)
2416 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2417 errmsg("missing data for column \"%s\"",
2418 NameStr(attr[m]->attname))));
2419 string = field_strings[fieldno++];
2421 if (cstate->csv_mode && string == NULL &&
2422 cstate->force_notnull_flags[m])
2424 /* Go ahead and read the NULL string */
2425 string = cstate->null_print;
2428 cstate->cur_attname = NameStr(attr[m]->attname);
2429 cstate->cur_attval = string;
2430 values[m] = InputFunctionCall(&in_functions[m],
2433 attr[m]->atttypmod);
2436 cstate->cur_attname = NULL;
2437 cstate->cur_attval = NULL;
2440 Assert(fieldno == nfields);
2448 cstate->cur_lineno++;
2450 if (!CopyGetInt16(cstate, &fld_count))
2452 /* EOF detected (end of file, or protocol-level EOF) */
2456 if (fld_count == -1)
2459 * Received EOF marker. In a V3-protocol copy, wait for the
2460 * protocol-level EOF, and complain if it doesn't come
2461 * immediately. This ensures that we correctly handle CopyFail,
2462 * if client chooses to send that now.
2464 * Note that we MUST NOT try to read more data in an old-protocol
2465 * copy, since there is no protocol-level EOF marker then. We
2466 * could go either way for copy from file, but choose to throw
2467 * error if there's data after the EOF marker, for consistency
2468 * with the new-protocol case.
2472 if (cstate->copy_dest != COPY_OLD_FE &&
2473 CopyGetData(cstate, &dummy, 1, 1) > 0)
2475 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2476 errmsg("received copy data after EOF marker")));
2480 if (fld_count != attr_count)
2482 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2483 errmsg("row field count is %d, expected %d",
2484 (int) fld_count, attr_count)));
2490 cstate->cur_attname = "oid";
2492 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
2494 &cstate->oid_in_function,
2495 cstate->oid_typioparam,
2498 if (isnull || loaded_oid == InvalidOid)
2500 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2501 errmsg("invalid OID in COPY data")));
2502 cstate->cur_attname = NULL;
2503 if (cstate->oids && tupleOid != NULL)
2504 *tupleOid = loaded_oid;
2508 foreach(cur, cstate->attnumlist)
2510 int attnum = lfirst_int(cur);
2513 cstate->cur_attname = NameStr(attr[m]->attname);
2515 values[m] = CopyReadBinaryAttribute(cstate,
2521 cstate->cur_attname = NULL;
2526 * Now compute and insert any defaults available for the columns not
2527 * provided by the input data. Anything not processed here or above will
2530 for (i = 0; i < num_defaults; i++)
2533 * The caller must supply econtext and have switched into the
2534 * per-tuple memory context in it.
2536 Assert(econtext != NULL);
2537 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
2539 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
2540 &nulls[defmap[i]], NULL);
2547 * Clean up storage and release resources for COPY FROM.
2550 EndCopyFrom(CopyState cstate)
2552 /* No COPY FROM related resources except memory. */
2558 * Read the next input line and stash it in line_buf, with conversion to
2561 * Result is true if read was terminated by EOF, false if terminated
2562 * by newline. The terminating newline or EOF marker is not included
2563 * in the final value of line_buf.
2566 CopyReadLine(CopyState cstate)
2570 resetStringInfo(&cstate->line_buf);
2572 /* Mark that encoding conversion hasn't occurred yet */
2573 cstate->line_buf_converted = false;
2575 /* Parse data and transfer into line_buf */
2576 result = CopyReadLineText(cstate);
2581 * Reached EOF. In protocol version 3, we should ignore anything
2582 * after \. up to the protocol end of copy data. (XXX maybe better
2583 * not to treat \. as special?)
2585 if (cstate->copy_dest == COPY_NEW_FE)
2589 cstate->raw_buf_index = cstate->raw_buf_len;
2590 } while (CopyLoadRawBuf(cstate));
2596 * If we didn't hit EOF, then we must have transferred the EOL marker
2597 * to line_buf along with the data. Get rid of it.
2599 switch (cstate->eol_type)
2602 Assert(cstate->line_buf.len >= 1);
2603 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2604 cstate->line_buf.len--;
2605 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2608 Assert(cstate->line_buf.len >= 1);
2609 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
2610 cstate->line_buf.len--;
2611 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2614 Assert(cstate->line_buf.len >= 2);
2615 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
2616 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2617 cstate->line_buf.len -= 2;
2618 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2621 /* shouldn't get here */
2627 /* Done reading the line. Convert it to server encoding. */
2628 if (cstate->need_transcoding)
2632 cvt = pg_any_to_server(cstate->line_buf.data,
2633 cstate->line_buf.len,
2634 cstate->file_encoding);
2635 if (cvt != cstate->line_buf.data)
2637 /* transfer converted data back to line_buf */
2638 resetStringInfo(&cstate->line_buf);
2639 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
2644 /* Now it's safe to use the buffer in error messages */
2645 cstate->line_buf_converted = true;
2651 * CopyReadLineText - inner loop of CopyReadLine for text mode
2654 CopyReadLineText(CopyState cstate)
2659 bool need_data = false;
2660 bool hit_eof = false;
2661 bool result = false;
2665 bool first_char_in_line = true;
2666 bool in_quote = false,
2667 last_was_esc = false;
2669 char escapec = '\0';
2671 if (cstate->csv_mode)
2673 quotec = cstate->quote[0];
2674 escapec = cstate->escape[0];
2675 /* ignore special escape processing if it's the same as quotec */
2676 if (quotec == escapec)
2680 mblen_str[1] = '\0';
2683 * The objective of this loop is to transfer the entire next input line
2684 * into line_buf. Hence, we only care for detecting newlines (\r and/or
2685 * \n) and the end-of-copy marker (\.).
2687 * In CSV mode, \r and \n inside a quoted field are just part of the data
2688 * value and are put in line_buf. We keep just enough state to know if we
2689 * are currently in a quoted field or not.
2691 * These four characters, and the CSV escape and quote characters, are
2692 * assumed the same in frontend and backend encodings.
2694 * For speed, we try to move data from raw_buf to line_buf in chunks
2695 * rather than one character at a time. raw_buf_ptr points to the next
2696 * character to examine; any characters from raw_buf_index to raw_buf_ptr
2697 * have been determined to be part of the line, but not yet transferred to
2700 * For a little extra speed within the loop, we copy raw_buf and
2701 * raw_buf_len into local variables.
2703 copy_raw_buf = cstate->raw_buf;
2704 raw_buf_ptr = cstate->raw_buf_index;
2705 copy_buf_len = cstate->raw_buf_len;
2713 * Load more data if needed. Ideally we would just force four bytes
2714 * of read-ahead and avoid the many calls to
2715 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
2716 * does not allow us to read too far ahead or we might read into the
2717 * next data, so we read-ahead only as far we know we can. One
2718 * optimization would be to read-ahead four byte here if
2719 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
2720 * considering the size of the buffer.
2722 if (raw_buf_ptr >= copy_buf_len || need_data)
2727 * Try to read some more data. This will certainly reset
2728 * raw_buf_index to zero, and raw_buf_ptr must go with it.
2730 if (!CopyLoadRawBuf(cstate))
2733 copy_buf_len = cstate->raw_buf_len;
2736 * If we are completely out of data, break out of the loop,
2739 if (copy_buf_len <= 0)
2747 /* OK to fetch a character */
2748 prev_raw_ptr = raw_buf_ptr;
2749 c = copy_raw_buf[raw_buf_ptr++];
2751 if (cstate->csv_mode)
2754 * If character is '\\' or '\r', we may need to look ahead below.
2755 * Force fetch of the next character if we don't already have it.
2756 * We need to do this before changing CSV state, in case one of
2757 * these characters is also the quote or escape character.
2759 * Note: old-protocol does not like forced prefetch, but it's OK
2760 * here since we cannot validly be at EOF.
2762 if (c == '\\' || c == '\r')
2764 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2768 * Dealing with quotes and escapes here is mildly tricky. If the
2769 * quote char is also the escape char, there's no problem - we
2770 * just use the char as a toggle. If they are different, we need
2771 * to ensure that we only take account of an escape inside a
2772 * quoted field and immediately preceding a quote char, and not
2773 * the second in a escape-escape sequence.
2775 if (in_quote && c == escapec)
2776 last_was_esc = !last_was_esc;
2777 if (c == quotec && !last_was_esc)
2778 in_quote = !in_quote;
2780 last_was_esc = false;
2783 * Updating the line count for embedded CR and/or LF chars is
2784 * necessarily a little fragile - this test is probably about the
2785 * best we can do. (XXX it's arguable whether we should do this
2786 * at all --- is cur_lineno a physical or logical count?)
2788 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
2789 cstate->cur_lineno++;
2793 if (c == '\r' && (!cstate->csv_mode || !in_quote))
2795 /* Check for \r\n on first line, _and_ handle \r\n. */
2796 if (cstate->eol_type == EOL_UNKNOWN ||
2797 cstate->eol_type == EOL_CRNL)
2800 * If need more data, go back to loop top to load it.
2802 * Note that if we are at EOF, c will wind up as '\0' because
2803 * of the guaranteed pad of raw_buf.
2805 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2808 c = copy_raw_buf[raw_buf_ptr];
2812 raw_buf_ptr++; /* eat newline */
2813 cstate->eol_type = EOL_CRNL; /* in case not set yet */
2817 /* found \r, but no \n */
2818 if (cstate->eol_type == EOL_CRNL)
2820 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2822 errmsg("literal carriage return found in data") :
2823 errmsg("unquoted carriage return found in data"),
2825 errhint("Use \"\\r\" to represent carriage return.") :
2826 errhint("Use quoted CSV field to represent carriage return.")));
2829 * if we got here, it is the first line and we didn't find
2830 * \n, so don't consume the peeked character
2832 cstate->eol_type = EOL_CR;
2835 else if (cstate->eol_type == EOL_NL)
2837 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2839 errmsg("literal carriage return found in data") :
2840 errmsg("unquoted carriage return found in data"),
2842 errhint("Use \"\\r\" to represent carriage return.") :
2843 errhint("Use quoted CSV field to represent carriage return.")));
2844 /* If reach here, we have found the line terminator */
2849 if (c == '\n' && (!cstate->csv_mode || !in_quote))
2851 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
2853 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2855 errmsg("literal newline found in data") :
2856 errmsg("unquoted newline found in data"),
2858 errhint("Use \"\\n\" to represent newline.") :
2859 errhint("Use quoted CSV field to represent newline.")));
2860 cstate->eol_type = EOL_NL; /* in case not set yet */
2861 /* If reach here, we have found the line terminator */
2866 * In CSV mode, we only recognize \. alone on a line. This is because
2867 * \. is a valid CSV data value.
2869 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
2873 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2874 IF_NEED_REFILL_AND_EOF_BREAK(0);
2877 * get next character
2878 * Note: we do not change c so if it isn't \., we can fall
2879 * through and continue processing for file encoding.
2882 c2 = copy_raw_buf[raw_buf_ptr];
2886 raw_buf_ptr++; /* consume the '.' */
2889 * Note: if we loop back for more data here, it does not
2890 * matter that the CSV state change checks are re-executed; we
2891 * will come back here with no important state changed.
2893 if (cstate->eol_type == EOL_CRNL)
2895 /* Get the next character */
2896 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2897 /* if hit_eof, c2 will become '\0' */
2898 c2 = copy_raw_buf[raw_buf_ptr++];
2902 if (!cstate->csv_mode)
2904 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2905 errmsg("end-of-copy marker does not match previous newline style")));
2907 NO_END_OF_COPY_GOTO;
2909 else if (c2 != '\r')
2911 if (!cstate->csv_mode)
2913 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2914 errmsg("end-of-copy marker corrupt")));
2916 NO_END_OF_COPY_GOTO;
2920 /* Get the next character */
2921 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2922 /* if hit_eof, c2 will become '\0' */
2923 c2 = copy_raw_buf[raw_buf_ptr++];
2925 if (c2 != '\r' && c2 != '\n')
2927 if (!cstate->csv_mode)
2929 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2930 errmsg("end-of-copy marker corrupt")));
2932 NO_END_OF_COPY_GOTO;
2935 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
2936 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
2937 (cstate->eol_type == EOL_CR && c2 != '\r'))
2940 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2941 errmsg("end-of-copy marker does not match previous newline style")));
2945 * Transfer only the data before the \. into line_buf, then
2946 * discard the data and the \. sequence.
2948 if (prev_raw_ptr > cstate->raw_buf_index)
2949 appendBinaryStringInfo(&cstate->line_buf,
2950 cstate->raw_buf + cstate->raw_buf_index,
2951 prev_raw_ptr - cstate->raw_buf_index);
2952 cstate->raw_buf_index = raw_buf_ptr;
2953 result = true; /* report EOF */
2956 else if (!cstate->csv_mode)
2959 * If we are here, it means we found a backslash followed by
2960 * something other than a period. In non-CSV mode, anything
2961 * after a backslash is special, so we skip over that second
2962 * character too. If we didn't do that \\. would be
2963 * considered an eof-of copy, while in non-CSV mode it is a
2964 * literal backslash followed by a period. In CSV mode,
2965 * backslashes are not special, so we want to process the
2966 * character after the backslash just like a normal character,
2967 * so we don't increment in those cases.
2973 * This label is for CSV cases where \. appears at the start of a
2974 * line, but there is more text after it, meaning it was a data value.
2975 * We are more strict for \. in CSV mode because \. could be a data
2976 * value, while in non-CSV mode, \. cannot be a data value.
2981 * Process all bytes of a multi-byte character as a group.
2983 * We only support multi-byte sequences where the first byte has the
2984 * high-bit set, so as an optimization we can avoid this block
2985 * entirely if it is not set.
2987 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
2992 /* All our encodings only read the first byte to get the length */
2993 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
2994 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
2995 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
2996 raw_buf_ptr += mblen - 1;
2998 first_char_in_line = false;
2999 } /* end of outer loop */
3002 * Transfer any still-uncopied data to line_buf.
3010 * Return decimal value for a hexadecimal digit
3013 GetDecimalFromHex(char hex)
3015 if (isdigit((unsigned char) hex))
3018 return tolower((unsigned char) hex) - 'a' + 10;
3022 * Parse the current line into separate attributes (fields),
3023 * performing de-escaping as needed.
3025 * The input is in line_buf. We use attribute_buf to hold the result
3026 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3027 * string, or NULL when the input matches the null marker string.
3028 * This array is expanded as necessary.
3030 * (Note that the caller cannot check for nulls since the returned
3031 * string would be the post-de-escaping equivalent, which may look
3032 * the same as some valid data string.)
3034 * delim is the column delimiter string (must be just one byte for now).
3035 * null_print is the null marker string. Note that this is compared to
3036 * the pre-de-escaped input string.
3038 * The return value is the number of fields actually read.
3041 CopyReadAttributesText(CopyState cstate)
3043 char delimc = cstate->delim[0];
3050 * We need a special case for zero-column tables: check that the input
3051 * line is empty, and return.
3053 if (cstate->max_fields <= 0)
3055 if (cstate->line_buf.len != 0)
3057 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3058 errmsg("extra data after last expected column")));
3062 resetStringInfo(&cstate->attribute_buf);
3065 * The de-escaped attributes will certainly not be longer than the input
3066 * data line, so we can just force attribute_buf to be large enough and
3067 * then transfer data without any checks for enough space. We need to do
3068 * it this way because enlarging attribute_buf mid-stream would invalidate
3069 * pointers already stored into cstate->raw_fields[].
3071 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3072 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3073 output_ptr = cstate->attribute_buf.data;
3075 /* set pointer variables for loop */
3076 cur_ptr = cstate->line_buf.data;
3077 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3079 /* Outer loop iterates over fields */
3083 bool found_delim = false;
3087 bool saw_non_ascii = false;
3089 /* Make sure there is enough space for the next value */
3090 if (fieldno >= cstate->max_fields)
3092 cstate->max_fields *= 2;
3093 cstate->raw_fields =
3094 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3097 /* Remember start of field on both input and output sides */
3098 start_ptr = cur_ptr;
3099 cstate->raw_fields[fieldno] = output_ptr;
3101 /* Scan data for field */
3107 if (cur_ptr >= line_end_ptr)
3117 if (cur_ptr >= line_end_ptr)
3135 if (cur_ptr < line_end_ptr)
3141 val = (val << 3) + OCTVALUE(c);
3142 if (cur_ptr < line_end_ptr)
3148 val = (val << 3) + OCTVALUE(c);
3154 if (c == '\0' || IS_HIGHBIT_SET(c))
3155 saw_non_ascii = true;
3160 if (cur_ptr < line_end_ptr)
3162 char hexchar = *cur_ptr;
3164 if (isxdigit((unsigned char) hexchar))
3166 int val = GetDecimalFromHex(hexchar);
3169 if (cur_ptr < line_end_ptr)
3172 if (isxdigit((unsigned char) hexchar))
3175 val = (val << 4) + GetDecimalFromHex(hexchar);
3179 if (c == '\0' || IS_HIGHBIT_SET(c))
3180 saw_non_ascii = true;
3204 * in all other cases, take the char after '\'
3210 /* Add c to output string */
3214 /* Terminate attribute value in output area */
3215 *output_ptr++ = '\0';
3218 * If we de-escaped a non-7-bit-ASCII char, make sure we still have
3219 * valid data for the db encoding. Avoid calling strlen here for the
3220 * sake of efficiency.
3224 char *fld = cstate->raw_fields[fieldno];
3226 pg_verifymbstr(fld, output_ptr - (fld + 1), false);
3229 /* Check whether raw input matched null marker */
3230 input_len = end_ptr - start_ptr;
3231 if (input_len == cstate->null_print_len &&
3232 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3233 cstate->raw_fields[fieldno] = NULL;
3236 /* Done if we hit EOL instead of a delim */
3241 /* Clean up state of attribute_buf */
3243 Assert(*output_ptr == '\0');
3244 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3250 * Parse the current line into separate attributes (fields),
3251 * performing de-escaping as needed. This has exactly the same API as
3252 * CopyReadAttributesText, except we parse the fields according to
3253 * "standard" (i.e. common) CSV usage.
3256 CopyReadAttributesCSV(CopyState cstate)
3258 char delimc = cstate->delim[0];
3259 char quotec = cstate->quote[0];
3260 char escapec = cstate->escape[0];
3267 * We need a special case for zero-column tables: check that the input
3268 * line is empty, and return.
3270 if (cstate->max_fields <= 0)
3272 if (cstate->line_buf.len != 0)
3274 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3275 errmsg("extra data after last expected column")));
3279 resetStringInfo(&cstate->attribute_buf);
3282 * The de-escaped attributes will certainly not be longer than the input
3283 * data line, so we can just force attribute_buf to be large enough and
3284 * then transfer data without any checks for enough space. We need to do
3285 * it this way because enlarging attribute_buf mid-stream would invalidate
3286 * pointers already stored into cstate->raw_fields[].
3288 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3289 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3290 output_ptr = cstate->attribute_buf.data;
3292 /* set pointer variables for loop */
3293 cur_ptr = cstate->line_buf.data;
3294 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3296 /* Outer loop iterates over fields */
3300 bool found_delim = false;
3301 bool saw_quote = false;
3306 /* Make sure there is enough space for the next value */
3307 if (fieldno >= cstate->max_fields)
3309 cstate->max_fields *= 2;
3310 cstate->raw_fields =
3311 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3314 /* Remember start of field on both input and output sides */
3315 start_ptr = cur_ptr;
3316 cstate->raw_fields[fieldno] = output_ptr;
3319 * Scan data for field,
3321 * The loop starts in "not quote" mode and then toggles between that
3322 * and "in quote" mode. The loop exits normally if it is in "not
3323 * quote" mode and a delimiter or line end is seen.
3333 if (cur_ptr >= line_end_ptr)
3336 /* unquoted field delimiter */
3342 /* start of quoted field (or part of field) */
3348 /* Add c to output string */
3356 if (cur_ptr >= line_end_ptr)
3358 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3359 errmsg("unterminated CSV quoted field")));
3363 /* escape within a quoted field */
3367 * peek at the next char if available, and escape it if it
3368 * is an escape char or a quote char
3370 if (cur_ptr < line_end_ptr)
3372 char nextc = *cur_ptr;
3374 if (nextc == escapec || nextc == quotec)
3376 *output_ptr++ = nextc;
3384 * end of quoted field. Must do this test after testing for
3385 * escape in case quote char and escape char are the same
3386 * (which is the common case).
3391 /* Add c to output string */
3397 /* Terminate attribute value in output area */
3398 *output_ptr++ = '\0';
3400 /* Check whether raw input matched null marker */
3401 input_len = end_ptr - start_ptr;
3402 if (!saw_quote && input_len == cstate->null_print_len &&
3403 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3404 cstate->raw_fields[fieldno] = NULL;
3407 /* Done if we hit EOL instead of a delim */
3412 /* Clean up state of attribute_buf */
3414 Assert(*output_ptr == '\0');
3415 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3422 * Read a binary attribute
3425 CopyReadBinaryAttribute(CopyState cstate,
3426 int column_no, FmgrInfo *flinfo,
3427 Oid typioparam, int32 typmod,
3433 if (!CopyGetInt32(cstate, &fld_size))
3435 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3436 errmsg("unexpected EOF in COPY data")));
3440 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
3444 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3445 errmsg("invalid field size")));
3447 /* reset attribute_buf to empty, and load raw data in it */
3448 resetStringInfo(&cstate->attribute_buf);
3450 enlargeStringInfo(&cstate->attribute_buf, fld_size);
3451 if (CopyGetData(cstate, cstate->attribute_buf.data,
3452 fld_size, fld_size) != fld_size)
3454 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3455 errmsg("unexpected EOF in COPY data")));
3457 cstate->attribute_buf.len = fld_size;
3458 cstate->attribute_buf.data[fld_size] = '\0';
3460 /* Call the column type's binary input converter */
3461 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
3462 typioparam, typmod);
3464 /* Trouble if it didn't eat the whole buffer */
3465 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
3467 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
3468 errmsg("incorrect binary data format")));
3475 * Send text representation of one attribute, with conversion and escaping
3477 #define DUMPSOFAR() \
3480 CopySendData(cstate, start, ptr - start); \
3484 CopyAttributeOutText(CopyState cstate, char *string)
3489 char delimc = cstate->delim[0];
3491 if (cstate->need_transcoding)
3492 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
3497 * We have to grovel through the string searching for control characters
3498 * and instances of the delimiter character. In most cases, though, these
3499 * are infrequent. To avoid overhead from calling CopySendData once per
3500 * character, we dump out all characters between escaped characters in a
3501 * single call. The loop invariant is that the data from "start" to "ptr"
3502 * can be sent literally, but hasn't yet been.
3504 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
3505 * in valid backend encodings, extra bytes of a multibyte character never
3506 * look like ASCII. This loop is sufficiently performance-critical that
3507 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
3508 * of the normal safe-encoding path.
3510 if (cstate->encoding_embeds_ascii)
3513 while ((c = *ptr) != '\0')
3515 if ((unsigned char) c < (unsigned char) 0x20)
3518 * \r and \n must be escaped, the others are traditional. We
3519 * prefer to dump these using the C-like notation, rather than
3520 * a backslash and the literal character, because it makes the
3521 * dump file a bit more proof against Microsoftish data
3545 /* If it's the delimiter, must backslash it */
3548 /* All ASCII control chars are length 1 */
3550 continue; /* fall to end of loop */
3552 /* if we get here, we need to convert the control char */
3554 CopySendChar(cstate, '\\');
3555 CopySendChar(cstate, c);
3556 start = ++ptr; /* do not include char in next run */
3558 else if (c == '\\' || c == delimc)
3561 CopySendChar(cstate, '\\');
3562 start = ptr++; /* we include char in next run */
3564 else if (IS_HIGHBIT_SET(c))
3565 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
3573 while ((c = *ptr) != '\0')
3575 if ((unsigned char) c < (unsigned char) 0x20)
3578 * \r and \n must be escaped, the others are traditional. We
3579 * prefer to dump these using the C-like notation, rather than
3580 * a backslash and the literal character, because it makes the
3581 * dump file a bit more proof against Microsoftish data
3605 /* If it's the delimiter, must backslash it */
3608 /* All ASCII control chars are length 1 */
3610 continue; /* fall to end of loop */
3612 /* if we get here, we need to convert the control char */
3614 CopySendChar(cstate, '\\');
3615 CopySendChar(cstate, c);
3616 start = ++ptr; /* do not include char in next run */
3618 else if (c == '\\' || c == delimc)
3621 CopySendChar(cstate, '\\');
3622 start = ptr++; /* we include char in next run */
3633 * Send text representation of one attribute, with conversion and
3634 * CSV-style escaping
3637 CopyAttributeOutCSV(CopyState cstate, char *string,
3638 bool use_quote, bool single_attr)
3643 char delimc = cstate->delim[0];
3644 char quotec = cstate->quote[0];
3645 char escapec = cstate->escape[0];
3647 /* force quoting if it matches null_print (before conversion!) */
3648 if (!use_quote && strcmp(string, cstate->null_print) == 0)
3651 if (cstate->need_transcoding)
3652 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
3657 * Make a preliminary pass to discover if it needs quoting
3662 * Because '\.' can be a data value, quote it if it appears alone on a
3663 * line so it is not interpreted as the end-of-data marker.
3665 if (single_attr && strcmp(ptr, "\\.") == 0)
3671 while ((c = *tptr) != '\0')
3673 if (c == delimc || c == quotec || c == '\n' || c == '\r')
3678 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3679 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
3688 CopySendChar(cstate, quotec);
3691 * We adopt the same optimization strategy as in CopyAttributeOutText
3694 while ((c = *ptr) != '\0')
3696 if (c == quotec || c == escapec)
3699 CopySendChar(cstate, escapec);
3700 start = ptr; /* we include char in next run */
3702 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3703 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
3709 CopySendChar(cstate, quotec);
3713 /* If it doesn't need quoting, we can just dump it as-is */
3714 CopySendString(cstate, ptr);
3719 * CopyGetAttnums - build an integer list of attnums to be copied
3721 * The input attnamelist is either the user-specified column list,
3722 * or NIL if there was none (in which case we want all the non-dropped
3725 * rel can be NULL ... it's only used for error reports.
3728 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
3730 List *attnums = NIL;
3732 if (attnamelist == NIL)
3734 /* Generate default column list */
3735 Form_pg_attribute *attr = tupDesc->attrs;
3736 int attr_count = tupDesc->natts;
3739 for (i = 0; i < attr_count; i++)
3741 if (attr[i]->attisdropped)
3743 attnums = lappend_int(attnums, i + 1);
3748 /* Validate the user-supplied list and extract attnums */
3751 foreach(l, attnamelist)
3753 char *name = strVal(lfirst(l));
3757 /* Lookup column name */
3758 attnum = InvalidAttrNumber;
3759 for (i = 0; i < tupDesc->natts; i++)
3761 if (tupDesc->attrs[i]->attisdropped)
3763 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
3765 attnum = tupDesc->attrs[i]->attnum;
3769 if (attnum == InvalidAttrNumber)
3773 (errcode(ERRCODE_UNDEFINED_COLUMN),
3774 errmsg("column \"%s\" of relation \"%s\" does not exist",
3775 name, RelationGetRelationName(rel))));
3778 (errcode(ERRCODE_UNDEFINED_COLUMN),
3779 errmsg("column \"%s\" does not exist",
3782 /* Check for duplicates */
3783 if (list_member_int(attnums, attnum))
3785 (errcode(ERRCODE_DUPLICATE_COLUMN),
3786 errmsg("column \"%s\" specified more than once",
3788 attnums = lappend_int(attnums, attnum);
3797 * copy_dest_startup --- executor startup
3800 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
3806 * copy_dest_receive --- receive one tuple
3809 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
3811 DR_copy *myState = (DR_copy *) self;
3812 CopyState cstate = myState->cstate;
3814 /* Make sure the tuple is fully deconstructed */
3815 slot_getallattrs(slot);
3817 /* And send the data */
3818 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
3819 myState->processed++;
3823 * copy_dest_shutdown --- executor end
3826 copy_dest_shutdown(DestReceiver *self)
3832 * copy_dest_destroy --- release DestReceiver object
3835 copy_dest_destroy(DestReceiver *self)
3841 * CreateCopyDestReceiver -- create a suitable DestReceiver object
3844 CreateCopyDestReceiver(void)
3846 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
3848 self->pub.receiveSlot = copy_dest_receive;
3849 self->pub.rStartup = copy_dest_startup;
3850 self->pub.rShutdown = copy_dest_shutdown;
3851 self->pub.rDestroy = copy_dest_destroy;
3852 self->pub.mydest = DestCopyOut;
3854 self->cstate = NULL; /* will be set later */
3855 self->processed = 0;
3857 return (DestReceiver *) self;