1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/commands/copy.c
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "catalog/namespace.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "parser/parse_relation.h"
40 #include "rewrite/rewriteHandler.h"
41 #include "storage/fd.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/lsyscache.h"
46 #include "utils/memutils.h"
47 #include "utils/portal.h"
48 #include "utils/rel.h"
49 #include "utils/snapmgr.h"
52 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
53 #define OCTVALUE(c) ((c) - '0')
56 * Represents the different source/dest cases we need to worry about at
61 COPY_FILE, /* to/from file */
62 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
63 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
67 * Represents the end-of-line terminator type of the input
78 * This struct contains all the state variables used throughout a COPY
79 * operation. For simplicity, we use the same struct for all variants of COPY,
80 * even though some fields are used in only some cases.
82 * Multi-byte encodings: all supported client-side encodings encode multi-byte
83 * characters by having the first byte's high bit set. Subsequent bytes of the
84 * character can have the high bit not set. When scanning data in such an
85 * encoding to look for a match to a single-byte (ie ASCII) character, we must
86 * use the full pg_encoding_mblen() machinery to skip over multibyte
87 * characters, else we might find a false match to a trailing byte. In
88 * supported server encodings, there is no possibility of a false match, and
89 * it's faster to make useless comparisons to trailing bytes than it is to
90 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
91 * when we have to do it the hard way.
93 typedef struct CopyStateData
95 /* low-level state data */
96 CopyDest copy_dest; /* type of copy source/destination */
97 FILE *copy_file; /* used if copy_dest == COPY_FILE */
98 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
99 * dest == COPY_NEW_FE in COPY FROM */
100 bool fe_eof; /* true if detected end of copy data */
101 EolType eol_type; /* EOL type of input */
102 int file_encoding; /* file or remote side's character encoding */
103 bool need_transcoding; /* file encoding diff from server? */
104 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
106 /* parameters from the COPY command */
107 Relation rel; /* relation to copy to or from */
108 QueryDesc *queryDesc; /* executable query to copy from */
109 List *attnumlist; /* integer list of attnums to copy */
110 char *filename; /* filename, or NULL for STDIN/STDOUT */
111 bool binary; /* binary format? */
112 bool oids; /* include OIDs? */
113 bool freeze; /* freeze rows on loading? */
114 bool csv_mode; /* Comma Separated Value format? */
115 bool header_line; /* CSV header line? */
116 char *null_print; /* NULL marker string (server encoding!) */
117 int null_print_len; /* length of same */
118 char *null_print_client; /* same converted to file encoding */
119 char *delim; /* column delimiter (must be 1 byte) */
120 char *quote; /* CSV quote char (must be 1 byte) */
121 char *escape; /* CSV escape char (must be 1 byte) */
122 List *force_quote; /* list of column names */
123 bool force_quote_all; /* FORCE QUOTE *? */
124 bool *force_quote_flags; /* per-column CSV FQ flags */
125 List *force_notnull; /* list of column names */
126 bool *force_notnull_flags; /* per-column CSV FNN flags */
127 bool convert_selectively; /* do selective binary conversion? */
128 List *convert_select; /* list of column names (can be NIL) */
129 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
131 /* these are just for error messages, see CopyFromErrorCallback */
132 const char *cur_relname; /* table name for error messages */
133 int cur_lineno; /* line number for error messages */
134 const char *cur_attname; /* current att for error messages */
135 const char *cur_attval; /* current att value for error messages */
138 * Working state for COPY TO/FROM
140 MemoryContext copycontext; /* per-copy execution context */
143 * Working state for COPY TO
145 FmgrInfo *out_functions; /* lookup info for output functions */
146 MemoryContext rowcontext; /* per-row evaluation context */
149 * Working state for COPY FROM
151 AttrNumber num_defaults;
153 FmgrInfo oid_in_function;
155 FmgrInfo *in_functions; /* array of input functions for each attrs */
156 Oid *typioparams; /* array of element types for in_functions */
157 int *defmap; /* array of default att numbers */
158 ExprState **defexprs; /* array of default att expressions */
159 bool volatile_defexprs; /* is any of defexprs volatile? */
162 * These variables are used to reduce overhead in textual COPY FROM.
164 * attribute_buf holds the separated, de-escaped text for each field of
165 * the current line. The CopyReadAttributes functions return arrays of
166 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
167 * the buffer on each cycle.
169 StringInfoData attribute_buf;
171 /* field raw data pointers found by COPY FROM */
177 * Similarly, line_buf holds the whole input line being processed. The
178 * input cycle is first to read the whole line into line_buf, convert it
179 * to server encoding there, and then extract the individual attribute
180 * fields into attribute_buf. line_buf is preserved unmodified so that we
181 * can display it in error messages if appropriate.
183 StringInfoData line_buf;
184 bool line_buf_converted; /* converted to server encoding? */
187 * Finally, raw_buf holds raw data read from the data source (file or
188 * client connection). CopyReadLine parses this data sufficiently to
189 * locate line boundaries, then transfers the data to line_buf and
190 * converts it. Note: we guarantee that there is a \0 at
191 * raw_buf[raw_buf_len].
193 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
195 int raw_buf_index; /* next byte to process */
196 int raw_buf_len; /* total # of bytes stored */
199 /* DestReceiver for COPY (SELECT) TO */
202 DestReceiver pub; /* publicly-known function pointers */
203 CopyState cstate; /* CopyStateData for the command */
204 uint64 processed; /* # of tuples processed */
209 * These macros centralize code used to process line_buf and raw_buf buffers.
210 * They are macros because they often do continue/break control and to avoid
211 * function call overhead in tight COPY loops.
213 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
214 * prevent the continue/break processing from working. We end the "if (1)"
215 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
216 * any "else" in the calling code, and to avoid any compiler warnings about
217 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
221 * This keeps the character read at the top of the loop in the buffer
222 * even if there is more than one read-ahead.
224 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
227 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
229 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
235 /* This consumes the remainder of the buffer and breaks */
236 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
239 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
242 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
243 /* backslash just before EOF, treat as data char */ \
250 * Transfer any approved data to line_buf; must do this to be sure
251 * there is some room in raw_buf.
253 #define REFILL_LINEBUF \
256 if (raw_buf_ptr > cstate->raw_buf_index) \
258 appendBinaryStringInfo(&cstate->line_buf, \
259 cstate->raw_buf + cstate->raw_buf_index, \
260 raw_buf_ptr - cstate->raw_buf_index); \
261 cstate->raw_buf_index = raw_buf_ptr; \
265 /* Undo any read-ahead and jump out of the block. */
266 #define NO_END_OF_COPY_GOTO \
269 raw_buf_ptr = prev_raw_ptr + 1; \
270 goto not_end_of_copy; \
273 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
276 /* non-export function prototypes */
277 static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
278 const char *queryString, List *attnamelist, List *options);
279 static void EndCopy(CopyState cstate);
280 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
281 const char *filename, List *attnamelist, List *options);
282 static void EndCopyTo(CopyState cstate);
283 static uint64 DoCopyTo(CopyState cstate);
284 static uint64 CopyTo(CopyState cstate);
285 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
286 Datum *values, bool *nulls);
287 static uint64 CopyFrom(CopyState cstate);
288 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
289 CommandId mycid, int hi_options,
290 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
291 BulkInsertState bistate,
292 int nBufferedTuples, HeapTuple *bufferedTuples);
293 static bool CopyReadLine(CopyState cstate);
294 static bool CopyReadLineText(CopyState cstate);
295 static int CopyReadAttributesText(CopyState cstate);
296 static int CopyReadAttributesCSV(CopyState cstate);
297 static Datum CopyReadBinaryAttribute(CopyState cstate,
298 int column_no, FmgrInfo *flinfo,
299 Oid typioparam, int32 typmod,
301 static void CopyAttributeOutText(CopyState cstate, char *string);
302 static void CopyAttributeOutCSV(CopyState cstate, char *string,
303 bool use_quote, bool single_attr);
304 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
306 static char *limit_printout_length(const char *str);
308 /* Low-level communications functions */
309 static void SendCopyBegin(CopyState cstate);
310 static void ReceiveCopyBegin(CopyState cstate);
311 static void SendCopyEnd(CopyState cstate);
312 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
313 static void CopySendString(CopyState cstate, const char *str);
314 static void CopySendChar(CopyState cstate, char c);
315 static void CopySendEndOfRow(CopyState cstate);
316 static int CopyGetData(CopyState cstate, void *databuf,
317 int minread, int maxread);
318 static void CopySendInt32(CopyState cstate, int32 val);
319 static bool CopyGetInt32(CopyState cstate, int32 *val);
320 static void CopySendInt16(CopyState cstate, int16 val);
321 static bool CopyGetInt16(CopyState cstate, int16 *val);
325 * Send copy start/stop messages for frontend copies. These have changed
326 * in past protocol redesigns.
329 SendCopyBegin(CopyState cstate)
331 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
335 int natts = list_length(cstate->attnumlist);
336 int16 format = (cstate->binary ? 1 : 0);
339 pq_beginmessage(&buf, 'H');
340 pq_sendbyte(&buf, format); /* overall format */
341 pq_sendint(&buf, natts, 2);
342 for (i = 0; i < natts; i++)
343 pq_sendint(&buf, format, 2); /* per-column formats */
345 cstate->copy_dest = COPY_NEW_FE;
347 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
352 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
353 errmsg("COPY BINARY is not supported to stdout or from stdin")));
354 pq_putemptymessage('H');
355 /* grottiness needed for old COPY OUT protocol */
357 cstate->copy_dest = COPY_OLD_FE;
364 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
365 errmsg("COPY BINARY is not supported to stdout or from stdin")));
366 pq_putemptymessage('B');
367 /* grottiness needed for old COPY OUT protocol */
369 cstate->copy_dest = COPY_OLD_FE;
374 ReceiveCopyBegin(CopyState cstate)
376 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
380 int natts = list_length(cstate->attnumlist);
381 int16 format = (cstate->binary ? 1 : 0);
384 pq_beginmessage(&buf, 'G');
385 pq_sendbyte(&buf, format); /* overall format */
386 pq_sendint(&buf, natts, 2);
387 for (i = 0; i < natts; i++)
388 pq_sendint(&buf, format, 2); /* per-column formats */
390 cstate->copy_dest = COPY_NEW_FE;
391 cstate->fe_msgbuf = makeStringInfo();
393 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
398 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
399 errmsg("COPY BINARY is not supported to stdout or from stdin")));
400 pq_putemptymessage('G');
401 cstate->copy_dest = COPY_OLD_FE;
408 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
409 errmsg("COPY BINARY is not supported to stdout or from stdin")));
410 pq_putemptymessage('D');
411 cstate->copy_dest = COPY_OLD_FE;
413 /* We *must* flush here to ensure FE knows it can send. */
418 SendCopyEnd(CopyState cstate)
420 if (cstate->copy_dest == COPY_NEW_FE)
422 /* Shouldn't have any unsent data */
423 Assert(cstate->fe_msgbuf->len == 0);
424 /* Send Copy Done message */
425 pq_putemptymessage('c');
429 CopySendData(cstate, "\\.", 2);
430 /* Need to flush out the trailer (this also appends a newline) */
431 CopySendEndOfRow(cstate);
432 pq_endcopyout(false);
437 * CopySendData sends output data to the destination (file or frontend)
438 * CopySendString does the same for null-terminated strings
439 * CopySendChar does the same for single characters
440 * CopySendEndOfRow does the appropriate thing at end of each data row
441 * (data is not actually flushed except by CopySendEndOfRow)
443 * NB: no data conversion is applied by these functions
447 CopySendData(CopyState cstate, const void *databuf, int datasize)
449 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
453 CopySendString(CopyState cstate, const char *str)
455 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
459 CopySendChar(CopyState cstate, char c)
461 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
465 CopySendEndOfRow(CopyState cstate)
467 StringInfo fe_msgbuf = cstate->fe_msgbuf;
469 switch (cstate->copy_dest)
474 /* Default line termination depends on platform */
476 CopySendChar(cstate, '\n');
478 CopySendString(cstate, "\r\n");
482 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
483 cstate->copy_file) != 1 ||
484 ferror(cstate->copy_file))
486 (errcode_for_file_access(),
487 errmsg("could not write to COPY file: %m")));
490 /* The FE/BE protocol uses \n as newline for all platforms */
492 CopySendChar(cstate, '\n');
494 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
496 /* no hope of recovering connection sync, so FATAL */
498 (errcode(ERRCODE_CONNECTION_FAILURE),
499 errmsg("connection lost during COPY to stdout")));
503 /* The FE/BE protocol uses \n as newline for all platforms */
505 CopySendChar(cstate, '\n');
507 /* Dump the accumulated row as one CopyData message */
508 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
512 resetStringInfo(fe_msgbuf);
516 * CopyGetData reads data from the source (file or frontend)
518 * We attempt to read at least minread, and at most maxread, bytes from
519 * the source. The actual number of bytes read is returned; if this is
520 * less than minread, EOF was detected.
522 * Note: when copying from the frontend, we expect a proper EOF mark per
523 * protocol; if the frontend simply drops the connection, we raise error.
524 * It seems unwise to allow the COPY IN to complete normally in that case.
526 * NB: no data conversion is applied here.
529 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
533 switch (cstate->copy_dest)
536 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
537 if (ferror(cstate->copy_file))
539 (errcode_for_file_access(),
540 errmsg("could not read from COPY file: %m")));
545 * We cannot read more than minread bytes (which in practice is 1)
546 * because old protocol doesn't have any clear way of separating
547 * the COPY stream from following data. This is slow, but not any
548 * slower than the code path was originally, and we don't care
549 * much anymore about the performance of old protocol.
551 if (pq_getbytes((char *) databuf, minread))
553 /* Only a \. terminator is legal EOF in old protocol */
555 (errcode(ERRCODE_CONNECTION_FAILURE),
556 errmsg("unexpected EOF on client connection with an open transaction")));
561 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
565 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
567 /* Try to receive another message */
571 mtype = pq_getbyte();
574 (errcode(ERRCODE_CONNECTION_FAILURE),
575 errmsg("unexpected EOF on client connection with an open transaction")));
576 if (pq_getmessage(cstate->fe_msgbuf, 0))
578 (errcode(ERRCODE_CONNECTION_FAILURE),
579 errmsg("unexpected EOF on client connection with an open transaction")));
582 case 'd': /* CopyData */
584 case 'c': /* CopyDone */
585 /* COPY IN correctly terminated by frontend */
586 cstate->fe_eof = true;
588 case 'f': /* CopyFail */
590 (errcode(ERRCODE_QUERY_CANCELED),
591 errmsg("COPY from stdin failed: %s",
592 pq_getmsgstring(cstate->fe_msgbuf))));
594 case 'H': /* Flush */
598 * Ignore Flush/Sync for the convenience of client
599 * libraries (such as libpq) that may send those
600 * without noticing that the command they just
606 (errcode(ERRCODE_PROTOCOL_VIOLATION),
607 errmsg("unexpected message type 0x%02X during COPY from stdin",
612 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
615 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
616 databuf = (void *) ((char *) databuf + avail);
628 * These functions do apply some data conversion
632 * CopySendInt32 sends an int32 in network byte order
635 CopySendInt32(CopyState cstate, int32 val)
639 buf = htonl((uint32) val);
640 CopySendData(cstate, &buf, sizeof(buf));
644 * CopyGetInt32 reads an int32 that appears in network byte order
646 * Returns true if OK, false if EOF
649 CopyGetInt32(CopyState cstate, int32 *val)
653 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
655 *val = 0; /* suppress compiler warning */
658 *val = (int32) ntohl(buf);
663 * CopySendInt16 sends an int16 in network byte order
666 CopySendInt16(CopyState cstate, int16 val)
670 buf = htons((uint16) val);
671 CopySendData(cstate, &buf, sizeof(buf));
675 * CopyGetInt16 reads an int16 that appears in network byte order
678 CopyGetInt16(CopyState cstate, int16 *val)
682 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
684 *val = 0; /* suppress compiler warning */
687 *val = (int16) ntohs(buf);
693 * CopyLoadRawBuf loads some more data into raw_buf
695 * Returns TRUE if able to obtain at least one more byte, else FALSE.
697 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
698 * down to the start of the buffer and then we load more data after that.
699 * This case is used only when a frontend multibyte character crosses a
700 * bufferload boundary.
703 CopyLoadRawBuf(CopyState cstate)
708 if (cstate->raw_buf_index < cstate->raw_buf_len)
710 /* Copy down the unprocessed data */
711 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
712 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
716 nbytes = 0; /* no data need be saved */
718 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
719 1, RAW_BUF_SIZE - nbytes);
721 cstate->raw_buf[nbytes] = '\0';
722 cstate->raw_buf_index = 0;
723 cstate->raw_buf_len = nbytes;
724 return (inbytes > 0);
729 * DoCopy executes the SQL COPY statement
731 * Either unload or reload contents of table <relation>, depending on <from>.
732 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
733 * we also support copying the output of an arbitrary SELECT query.
735 * If <pipe> is false, transfer is between the table and the file named
736 * <filename>. Otherwise, transfer is between the table and our regular
737 * input/output stream. The latter could be either stdin/stdout or a
738 * socket, depending on whether we're running under Postmaster control.
740 * Do not allow a Postgres user without superuser privilege to read from
741 * or write to a file.
743 * Do not allow the copy if user doesn't have proper permission to access
744 * the table or the specifically requested columns.
747 DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
750 bool is_from = stmt->is_from;
751 bool pipe = (stmt->filename == NULL);
755 /* Disallow file COPY except to superusers. */
756 if (!pipe && !superuser())
758 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
759 errmsg("must be superuser to COPY to or from a file"),
760 errhint("Anyone can COPY to stdout or from stdin. "
761 "psql's \\copy command also works for anyone.")));
766 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
771 Assert(!stmt->query);
773 /* Open and lock the relation, using the appropriate lock type. */
774 rel = heap_openrv(stmt->relation,
775 (is_from ? RowExclusiveLock : AccessShareLock));
777 relid = RelationGetRelid(rel);
779 rte = makeNode(RangeTblEntry);
780 rte->rtekind = RTE_RELATION;
781 rte->relid = RelationGetRelid(rel);
782 rte->relkind = rel->rd_rel->relkind;
783 rte->requiredPerms = required_access;
785 tupDesc = RelationGetDescr(rel);
786 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
787 foreach(cur, attnums)
789 int attno = lfirst_int(cur) -
790 FirstLowInvalidHeapAttributeNumber;
793 rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
795 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
797 ExecCheckRTPerms(list_make1(rte), true);
811 /* check read-only transaction */
812 if (XactReadOnly && !rel->rd_islocaltemp)
813 PreventCommandIfReadOnly("COPY FROM");
815 cstate = BeginCopyFrom(rel, stmt->filename,
816 stmt->attlist, stmt->options);
817 *processed = CopyFrom(cstate); /* copy from file to database */
822 cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
823 stmt->attlist, stmt->options);
824 *processed = DoCopyTo(cstate); /* copy from database to file */
829 * Close the relation. If reading, we can release the AccessShareLock we
830 * got; if writing, we should hold the lock until end of transaction to
831 * ensure that updates will be committed before lock is released.
834 heap_close(rel, (is_from ? NoLock : AccessShareLock));
840 * Process the statement option list for COPY.
842 * Scan the options list (a list of DefElem) and transpose the information
843 * into cstate, applying appropriate error checking.
845 * cstate is assumed to be filled with zeroes initially.
847 * This is exported so that external users of the COPY API can sanity-check
848 * a list of options. In that usage, cstate should be passed as NULL
849 * (since external users don't know sizeof(CopyStateData)) and the collected
850 * data is just leaked until CurrentMemoryContext is reset.
852 * Note that additional checking, such as whether column names listed in FORCE
853 * QUOTE actually exist, has to be applied later. This just checks for
854 * self-consistency of the options list.
857 ProcessCopyOptions(CopyState cstate,
861 bool format_specified = false;
864 /* Support external use for option sanity checking */
866 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
868 cstate->file_encoding = -1;
870 /* Extract options from the statement node tree */
871 foreach(option, options)
873 DefElem *defel = (DefElem *) lfirst(option);
875 if (strcmp(defel->defname, "format") == 0)
877 char *fmt = defGetString(defel);
879 if (format_specified)
881 (errcode(ERRCODE_SYNTAX_ERROR),
882 errmsg("conflicting or redundant options")));
883 format_specified = true;
884 if (strcmp(fmt, "text") == 0)
885 /* default format */ ;
886 else if (strcmp(fmt, "csv") == 0)
887 cstate->csv_mode = true;
888 else if (strcmp(fmt, "binary") == 0)
889 cstate->binary = true;
892 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
893 errmsg("COPY format \"%s\" not recognized", fmt)));
895 else if (strcmp(defel->defname, "oids") == 0)
899 (errcode(ERRCODE_SYNTAX_ERROR),
900 errmsg("conflicting or redundant options")));
901 cstate->oids = defGetBoolean(defel);
903 else if (strcmp(defel->defname, "freeze") == 0)
907 (errcode(ERRCODE_SYNTAX_ERROR),
908 errmsg("conflicting or redundant options")));
909 cstate->freeze = defGetBoolean(defel);
911 else if (strcmp(defel->defname, "delimiter") == 0)
915 (errcode(ERRCODE_SYNTAX_ERROR),
916 errmsg("conflicting or redundant options")));
917 cstate->delim = defGetString(defel);
919 else if (strcmp(defel->defname, "null") == 0)
921 if (cstate->null_print)
923 (errcode(ERRCODE_SYNTAX_ERROR),
924 errmsg("conflicting or redundant options")));
925 cstate->null_print = defGetString(defel);
927 else if (strcmp(defel->defname, "header") == 0)
929 if (cstate->header_line)
931 (errcode(ERRCODE_SYNTAX_ERROR),
932 errmsg("conflicting or redundant options")));
933 cstate->header_line = defGetBoolean(defel);
935 else if (strcmp(defel->defname, "quote") == 0)
939 (errcode(ERRCODE_SYNTAX_ERROR),
940 errmsg("conflicting or redundant options")));
941 cstate->quote = defGetString(defel);
943 else if (strcmp(defel->defname, "escape") == 0)
947 (errcode(ERRCODE_SYNTAX_ERROR),
948 errmsg("conflicting or redundant options")));
949 cstate->escape = defGetString(defel);
951 else if (strcmp(defel->defname, "force_quote") == 0)
953 if (cstate->force_quote || cstate->force_quote_all)
955 (errcode(ERRCODE_SYNTAX_ERROR),
956 errmsg("conflicting or redundant options")));
957 if (defel->arg && IsA(defel->arg, A_Star))
958 cstate->force_quote_all = true;
959 else if (defel->arg && IsA(defel->arg, List))
960 cstate->force_quote = (List *) defel->arg;
963 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
964 errmsg("argument to option \"%s\" must be a list of column names",
967 else if (strcmp(defel->defname, "force_not_null") == 0)
969 if (cstate->force_notnull)
971 (errcode(ERRCODE_SYNTAX_ERROR),
972 errmsg("conflicting or redundant options")));
973 if (defel->arg && IsA(defel->arg, List))
974 cstate->force_notnull = (List *) defel->arg;
977 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
978 errmsg("argument to option \"%s\" must be a list of column names",
981 else if (strcmp(defel->defname, "convert_selectively") == 0)
984 * Undocumented, not-accessible-from-SQL option: convert only
985 * the named columns to binary form, storing the rest as NULLs.
986 * It's allowed for the column list to be NIL.
988 if (cstate->convert_selectively)
990 (errcode(ERRCODE_SYNTAX_ERROR),
991 errmsg("conflicting or redundant options")));
992 cstate->convert_selectively = true;
993 if (defel->arg == NULL || IsA(defel->arg, List))
994 cstate->convert_select = (List *) defel->arg;
997 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
998 errmsg("argument to option \"%s\" must be a list of column names",
1001 else if (strcmp(defel->defname, "encoding") == 0)
1003 if (cstate->file_encoding >= 0)
1005 (errcode(ERRCODE_SYNTAX_ERROR),
1006 errmsg("conflicting or redundant options")));
1007 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1008 if (cstate->file_encoding < 0)
1010 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1011 errmsg("argument to option \"%s\" must be a valid encoding name",
1016 (errcode(ERRCODE_SYNTAX_ERROR),
1017 errmsg("option \"%s\" not recognized",
1022 * Check for incompatible options (must do these two before inserting
1025 if (cstate->binary && cstate->delim)
1027 (errcode(ERRCODE_SYNTAX_ERROR),
1028 errmsg("cannot specify DELIMITER in BINARY mode")));
1030 if (cstate->binary && cstate->null_print)
1032 (errcode(ERRCODE_SYNTAX_ERROR),
1033 errmsg("cannot specify NULL in BINARY mode")));
1035 /* Set defaults for omitted options */
1037 cstate->delim = cstate->csv_mode ? "," : "\t";
1039 if (!cstate->null_print)
1040 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1041 cstate->null_print_len = strlen(cstate->null_print);
1043 if (cstate->csv_mode)
1046 cstate->quote = "\"";
1047 if (!cstate->escape)
1048 cstate->escape = cstate->quote;
1051 /* Only single-byte delimiter strings are supported. */
1052 if (strlen(cstate->delim) != 1)
1054 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1055 errmsg("COPY delimiter must be a single one-byte character")));
1057 /* Disallow end-of-line characters */
1058 if (strchr(cstate->delim, '\r') != NULL ||
1059 strchr(cstate->delim, '\n') != NULL)
1061 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1062 errmsg("COPY delimiter cannot be newline or carriage return")));
1064 if (strchr(cstate->null_print, '\r') != NULL ||
1065 strchr(cstate->null_print, '\n') != NULL)
1067 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1068 errmsg("COPY null representation cannot use newline or carriage return")));
1071 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1072 * backslash because it would be ambiguous. We can't allow the other
1073 * cases because data characters matching the delimiter must be
1074 * backslashed, and certain backslash combinations are interpreted
1075 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1076 * more than strictly necessary, but seems best for consistency and
1077 * future-proofing. Likewise we disallow all digits though only octal
1078 * digits are actually dangerous.
1080 if (!cstate->csv_mode &&
1081 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1082 cstate->delim[0]) != NULL)
1084 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1085 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1088 if (!cstate->csv_mode && cstate->header_line)
1090 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1091 errmsg("COPY HEADER available only in CSV mode")));
1094 if (!cstate->csv_mode && cstate->quote != NULL)
1096 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1097 errmsg("COPY quote available only in CSV mode")));
1099 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1101 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1102 errmsg("COPY quote must be a single one-byte character")));
1104 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1106 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1107 errmsg("COPY delimiter and quote must be different")));
1110 if (!cstate->csv_mode && cstate->escape != NULL)
1112 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1113 errmsg("COPY escape available only in CSV mode")));
1115 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1117 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1118 errmsg("COPY escape must be a single one-byte character")));
1120 /* Check force_quote */
1121 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1123 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1124 errmsg("COPY force quote available only in CSV mode")));
1125 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1127 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1128 errmsg("COPY force quote only available using COPY TO")));
1130 /* Check force_notnull */
1131 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1133 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1134 errmsg("COPY force not null available only in CSV mode")));
1135 if (cstate->force_notnull != NIL && !is_from)
1137 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1138 errmsg("COPY force not null only available using COPY FROM")));
1140 /* Don't allow the delimiter to appear in the null string. */
1141 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1143 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1144 errmsg("COPY delimiter must not appear in the NULL specification")));
1146 /* Don't allow the CSV quote char to appear in the null string. */
1147 if (cstate->csv_mode &&
1148 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1150 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1151 errmsg("CSV quote character must not appear in the NULL specification")));
1155 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1157 * Iff <binary>, unload or reload in the binary format, as opposed to the
1158 * more wasteful but more robust and portable text format.
1160 * Iff <oids>, unload or reload the format that includes OID information.
1161 * On input, we accept OIDs whether or not the table has an OID column,
1162 * but silently drop them if it does not. On output, we report an error
1163 * if the user asks for OIDs in a table that has none (not providing an
1164 * OID column might seem friendlier, but could seriously confuse programs).
1166 * If in the text format, delimit columns with delimiter <delim> and print
1167 * NULL values as <null_print>.
1170 BeginCopy(bool is_from,
1173 const char *queryString,
1180 MemoryContext oldcontext;
1182 /* Allocate workspace and zero all fields */
1183 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1186 * We allocate everything used by a cstate in a new memory context. This
1187 * avoids memory leaks during repeated use of COPY in a query.
1189 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1191 ALLOCSET_DEFAULT_MINSIZE,
1192 ALLOCSET_DEFAULT_INITSIZE,
1193 ALLOCSET_DEFAULT_MAXSIZE);
1195 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1197 /* Extract options from the statement node tree */
1198 ProcessCopyOptions(cstate, is_from, options);
1200 /* Process the source/target relation or query */
1207 tupDesc = RelationGetDescr(cstate->rel);
1209 /* Don't allow COPY w/ OIDs to or from a table without them */
1210 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1212 (errcode(ERRCODE_UNDEFINED_COLUMN),
1213 errmsg("table \"%s\" does not have OIDs",
1214 RelationGetRelationName(cstate->rel))));
1226 /* Don't allow COPY w/ OIDs from a select */
1229 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1230 errmsg("COPY (SELECT) WITH OIDS is not supported")));
1233 * Run parse analysis and rewrite. Note this also acquires sufficient
1234 * locks on the source table(s).
1236 * Because the parser and planner tend to scribble on their input, we
1237 * make a preliminary copy of the source querytree. This prevents
1238 * problems in the case that the COPY is in a portal or plpgsql
1239 * function and is executed repeatedly. (See also the same hack in
1240 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1242 rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
1243 queryString, NULL, 0);
1245 /* We don't expect more or less than one result query */
1246 if (list_length(rewritten) != 1)
1247 elog(ERROR, "unexpected rewrite result");
1249 query = (Query *) linitial(rewritten);
1251 /* The grammar allows SELECT INTO, but we don't support that */
1252 if (query->utilityStmt != NULL &&
1253 IsA(query->utilityStmt, CreateTableAsStmt))
1255 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1256 errmsg("COPY (SELECT INTO) is not supported")));
1258 Assert(query->commandType == CMD_SELECT);
1259 Assert(query->utilityStmt == NULL);
1261 /* plan the query */
1262 plan = planner(query, 0, NULL);
1265 * Use a snapshot with an updated command ID to ensure this query sees
1266 * results of any previously executed queries.
1268 PushCopiedSnapshot(GetActiveSnapshot());
1269 UpdateActiveSnapshotCommandId();
1271 /* Create dest receiver for COPY OUT */
1272 dest = CreateDestReceiver(DestCopyOut);
1273 ((DR_copy *) dest)->cstate = cstate;
1275 /* Create a QueryDesc requesting no output */
1276 cstate->queryDesc = CreateQueryDesc(plan, queryString,
1277 GetActiveSnapshot(),
1282 * Call ExecutorStart to prepare the plan for execution.
1284 * ExecutorStart computes a result tupdesc for us
1286 ExecutorStart(cstate->queryDesc, 0);
1288 tupDesc = cstate->queryDesc->tupDesc;
1291 /* Generate or convert list of attributes to process */
1292 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1294 num_phys_attrs = tupDesc->natts;
1296 /* Convert FORCE QUOTE name list to per-column flags, check validity */
1297 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1298 if (cstate->force_quote_all)
1302 for (i = 0; i < num_phys_attrs; i++)
1303 cstate->force_quote_flags[i] = true;
1305 else if (cstate->force_quote)
1310 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1312 foreach(cur, attnums)
1314 int attnum = lfirst_int(cur);
1316 if (!list_member_int(cstate->attnumlist, attnum))
1318 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1319 errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
1320 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1321 cstate->force_quote_flags[attnum - 1] = true;
1325 /* Convert FORCE NOT NULL name list to per-column flags, check validity */
1326 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1327 if (cstate->force_notnull)
1332 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1334 foreach(cur, attnums)
1336 int attnum = lfirst_int(cur);
1338 if (!list_member_int(cstate->attnumlist, attnum))
1340 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1341 errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
1342 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1343 cstate->force_notnull_flags[attnum - 1] = true;
1347 /* Convert convert_selectively name list to per-column flags */
1348 if (cstate->convert_selectively)
1353 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1355 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1357 foreach(cur, attnums)
1359 int attnum = lfirst_int(cur);
1361 if (!list_member_int(cstate->attnumlist, attnum))
1363 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1364 errmsg_internal("selected column \"%s\" not referenced by COPY",
1365 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1366 cstate->convert_select_flags[attnum - 1] = true;
1370 /* Use client encoding when ENCODING option is not specified. */
1371 if (cstate->file_encoding < 0)
1372 cstate->file_encoding = pg_get_client_encoding();
1375 * Set up encoding conversion info. Even if the file and server encodings
1376 * are the same, we must apply pg_any_to_server() to validate data in
1377 * multibyte encodings.
1379 cstate->need_transcoding =
1380 (cstate->file_encoding != GetDatabaseEncoding() ||
1381 pg_database_encoding_max_length() > 1);
1382 /* See Multibyte encoding comment above */
1383 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1385 cstate->copy_dest = COPY_FILE; /* default */
1387 MemoryContextSwitchTo(oldcontext);
1393 * Release resources allocated in a cstate for COPY TO/FROM.
1396 EndCopy(CopyState cstate)
1398 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1400 (errcode_for_file_access(),
1401 errmsg("could not close file \"%s\": %m",
1402 cstate->filename)));
1404 MemoryContextDelete(cstate->copycontext);
1409 * Setup CopyState to read tuples from a table or a query for COPY TO.
1412 BeginCopyTo(Relation rel,
1414 const char *queryString,
1415 const char *filename,
1420 bool pipe = (filename == NULL);
1421 MemoryContext oldcontext;
1423 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1425 if (rel->rd_rel->relkind == RELKIND_VIEW)
1427 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1428 errmsg("cannot copy from view \"%s\"",
1429 RelationGetRelationName(rel)),
1430 errhint("Try the COPY (SELECT ...) TO variant.")));
1431 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1433 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1434 errmsg("cannot copy from foreign table \"%s\"",
1435 RelationGetRelationName(rel)),
1436 errhint("Try the COPY (SELECT ...) TO variant.")));
1437 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1439 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1440 errmsg("cannot copy from sequence \"%s\"",
1441 RelationGetRelationName(rel))));
1444 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1445 errmsg("cannot copy from non-table relation \"%s\"",
1446 RelationGetRelationName(rel))));
1449 cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
1450 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1454 if (whereToSendOutput != DestRemote)
1455 cstate->copy_file = stdout;
1459 mode_t oumask; /* Pre-existing umask value */
1463 * Prevent write to relative path ... too easy to shoot oneself in the
1464 * foot by overwriting a database file ...
1466 if (!is_absolute_path(filename))
1468 (errcode(ERRCODE_INVALID_NAME),
1469 errmsg("relative path not allowed for COPY to file")));
1471 cstate->filename = pstrdup(filename);
1472 oumask = umask(S_IWGRP | S_IWOTH);
1473 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1476 if (cstate->copy_file == NULL)
1478 (errcode_for_file_access(),
1479 errmsg("could not open file \"%s\" for writing: %m",
1480 cstate->filename)));
1482 fstat(fileno(cstate->copy_file), &st);
1483 if (S_ISDIR(st.st_mode))
1485 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1486 errmsg("\"%s\" is a directory", cstate->filename)));
1489 MemoryContextSwitchTo(oldcontext);
1495 * This intermediate routine exists mainly to localize the effects of setjmp
1496 * so we don't need to plaster a lot of variables with "volatile".
1499 DoCopyTo(CopyState cstate)
1501 bool pipe = (cstate->filename == NULL);
1502 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1508 SendCopyBegin(cstate);
1510 processed = CopyTo(cstate);
1513 SendCopyEnd(cstate);
1518 * Make sure we turn off old-style COPY OUT mode upon error. It is
1519 * okay to do this in all cases, since it does nothing if the mode is
1522 pq_endcopyout(true);
1531 * Clean up storage and release resources for COPY TO.
1534 EndCopyTo(CopyState cstate)
1536 if (cstate->queryDesc != NULL)
1538 /* Close down the query and free resources. */
1539 ExecutorFinish(cstate->queryDesc);
1540 ExecutorEnd(cstate->queryDesc);
1541 FreeQueryDesc(cstate->queryDesc);
1542 PopActiveSnapshot();
1545 /* Clean up storage */
1550 * Copy from relation or query TO file.
1553 CopyTo(CopyState cstate)
1557 Form_pg_attribute *attr;
1562 tupDesc = RelationGetDescr(cstate->rel);
1564 tupDesc = cstate->queryDesc->tupDesc;
1565 attr = tupDesc->attrs;
1566 num_phys_attrs = tupDesc->natts;
1567 cstate->null_print_client = cstate->null_print; /* default */
1569 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1570 cstate->fe_msgbuf = makeStringInfo();
1572 /* Get info about the columns we need to process. */
1573 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1574 foreach(cur, cstate->attnumlist)
1576 int attnum = lfirst_int(cur);
1581 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1585 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1588 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1592 * Create a temporary memory context that we can reset once per row to
1593 * recover palloc'd memory. This avoids any problems with leaks inside
1594 * datatype output routines, and should be faster than retail pfree's
1595 * anyway. (We don't need a whole econtext as CopyFrom does.)
1597 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1599 ALLOCSET_DEFAULT_MINSIZE,
1600 ALLOCSET_DEFAULT_INITSIZE,
1601 ALLOCSET_DEFAULT_MAXSIZE);
1605 /* Generate header for a binary copy */
1609 CopySendData(cstate, BinarySignature, 11);
1614 CopySendInt32(cstate, tmp);
1615 /* No header extension */
1617 CopySendInt32(cstate, tmp);
1622 * For non-binary copy, we need to convert null_print to file
1623 * encoding, because it will be sent directly with CopySendString.
1625 if (cstate->need_transcoding)
1626 cstate->null_print_client = pg_server_to_any(cstate->null_print,
1627 cstate->null_print_len,
1628 cstate->file_encoding);
1630 /* if a header has been requested send the line */
1631 if (cstate->header_line)
1633 bool hdr_delim = false;
1635 foreach(cur, cstate->attnumlist)
1637 int attnum = lfirst_int(cur);
1641 CopySendChar(cstate, cstate->delim[0]);
1644 colname = NameStr(attr[attnum - 1]->attname);
1646 CopyAttributeOutCSV(cstate, colname, false,
1647 list_length(cstate->attnumlist) == 1);
1650 CopySendEndOfRow(cstate);
1658 HeapScanDesc scandesc;
1661 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1662 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1664 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
1667 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
1669 CHECK_FOR_INTERRUPTS();
1671 /* Deconstruct the tuple ... faster than repeated heap_getattr */
1672 heap_deform_tuple(tuple, tupDesc, values, nulls);
1674 /* Format and send the data */
1675 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
1679 heap_endscan(scandesc);
1686 /* run the plan --- the dest receiver will send tuples */
1687 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
1688 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
1693 /* Generate trailer for a binary copy */
1694 CopySendInt16(cstate, -1);
1695 /* Need to flush out the trailer */
1696 CopySendEndOfRow(cstate);
1699 MemoryContextDelete(cstate->rowcontext);
1705 * Emit one row during CopyTo().
1708 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
1710 bool need_delim = false;
1711 FmgrInfo *out_functions = cstate->out_functions;
1712 MemoryContext oldcontext;
1716 MemoryContextReset(cstate->rowcontext);
1717 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1721 /* Binary per-tuple header */
1722 CopySendInt16(cstate, list_length(cstate->attnumlist));
1723 /* Send OID if wanted --- note attnumlist doesn't include it */
1726 /* Hack --- assume Oid is same size as int32 */
1727 CopySendInt32(cstate, sizeof(int32));
1728 CopySendInt32(cstate, tupleOid);
1733 /* Text format has no per-tuple header, but send OID if wanted */
1734 /* Assume digits don't need any quoting or encoding conversion */
1737 string = DatumGetCString(DirectFunctionCall1(oidout,
1738 ObjectIdGetDatum(tupleOid)));
1739 CopySendString(cstate, string);
1744 foreach(cur, cstate->attnumlist)
1746 int attnum = lfirst_int(cur);
1747 Datum value = values[attnum - 1];
1748 bool isnull = nulls[attnum - 1];
1750 if (!cstate->binary)
1753 CopySendChar(cstate, cstate->delim[0]);
1759 if (!cstate->binary)
1760 CopySendString(cstate, cstate->null_print_client);
1762 CopySendInt32(cstate, -1);
1766 if (!cstate->binary)
1768 string = OutputFunctionCall(&out_functions[attnum - 1],
1770 if (cstate->csv_mode)
1771 CopyAttributeOutCSV(cstate, string,
1772 cstate->force_quote_flags[attnum - 1],
1773 list_length(cstate->attnumlist) == 1);
1775 CopyAttributeOutText(cstate, string);
1781 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
1783 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
1784 CopySendData(cstate, VARDATA(outputbytes),
1785 VARSIZE(outputbytes) - VARHDRSZ);
1790 CopySendEndOfRow(cstate);
1792 MemoryContextSwitchTo(oldcontext);
1797 * error context callback for COPY FROM
1799 * The argument for the error context must be CopyState.
1802 CopyFromErrorCallback(void *arg)
1804 CopyState cstate = (CopyState) arg;
1808 /* can't usefully display the data */
1809 if (cstate->cur_attname)
1810 errcontext("COPY %s, line %d, column %s",
1811 cstate->cur_relname, cstate->cur_lineno,
1812 cstate->cur_attname);
1814 errcontext("COPY %s, line %d",
1815 cstate->cur_relname, cstate->cur_lineno);
1819 if (cstate->cur_attname && cstate->cur_attval)
1821 /* error is relevant to a particular column */
1824 attval = limit_printout_length(cstate->cur_attval);
1825 errcontext("COPY %s, line %d, column %s: \"%s\"",
1826 cstate->cur_relname, cstate->cur_lineno,
1827 cstate->cur_attname, attval);
1830 else if (cstate->cur_attname)
1832 /* error is relevant to a particular column, value is NULL */
1833 errcontext("COPY %s, line %d, column %s: null input",
1834 cstate->cur_relname, cstate->cur_lineno,
1835 cstate->cur_attname);
1839 /* error is relevant to a particular line */
1840 if (cstate->line_buf_converted || !cstate->need_transcoding)
1844 lineval = limit_printout_length(cstate->line_buf.data);
1845 errcontext("COPY %s, line %d: \"%s\"",
1846 cstate->cur_relname, cstate->cur_lineno, lineval);
1852 * Here, the line buffer is still in a foreign encoding, and
1853 * indeed it's quite likely that the error is precisely a
1854 * failure to do encoding conversion (ie, bad data). We dare
1855 * not try to convert it, and at present there's no way to
1856 * regurgitate it without conversion. So we have to punt and
1857 * just report the line number.
1859 errcontext("COPY %s, line %d",
1860 cstate->cur_relname, cstate->cur_lineno);
1867 * Make sure we don't print an unreasonable amount of COPY data in a message.
1869 * It would seem a lot easier to just use the sprintf "precision" limit to
1870 * truncate the string. However, some versions of glibc have a bug/misfeature
1871 * that vsnprintf will always fail (return -1) if it is asked to truncate
1872 * a string that contains invalid byte sequences for the current encoding.
1873 * So, do our own truncation. We return a pstrdup'd copy of the input.
1876 limit_printout_length(const char *str)
1878 #define MAX_COPY_DATA_DISPLAY 100
1880 int slen = strlen(str);
1884 /* Fast path if definitely okay */
1885 if (slen <= MAX_COPY_DATA_DISPLAY)
1886 return pstrdup(str);
1888 /* Apply encoding-dependent truncation */
1889 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
1892 * Truncate, and add "..." to show we truncated the input.
1894 res = (char *) palloc(len + 4);
1895 memcpy(res, str, len);
1896 strcpy(res + len, "...");
1902 * Copy FROM file to relation.
1905 CopyFrom(CopyState cstate)
1911 ResultRelInfo *resultRelInfo;
1912 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
1913 ExprContext *econtext;
1914 TupleTableSlot *myslot;
1915 MemoryContext oldcontext = CurrentMemoryContext;
1917 ErrorContextCallback errcallback;
1918 CommandId mycid = GetCurrentCommandId(true);
1919 int hi_options = 0; /* start with default heap_insert options */
1920 BulkInsertState bistate;
1921 uint64 processed = 0;
1922 bool useHeapMultiInsert;
1923 int nBufferedTuples = 0;
1925 #define MAX_BUFFERED_TUPLES 1000
1926 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
1927 Size bufferedTuplesSize = 0;
1929 Assert(cstate->rel);
1931 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1933 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1935 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1936 errmsg("cannot copy to view \"%s\"",
1937 RelationGetRelationName(cstate->rel))));
1938 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1940 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1941 errmsg("cannot copy to foreign table \"%s\"",
1942 RelationGetRelationName(cstate->rel))));
1943 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1945 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1946 errmsg("cannot copy to sequence \"%s\"",
1947 RelationGetRelationName(cstate->rel))));
1950 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1951 errmsg("cannot copy to non-table relation \"%s\"",
1952 RelationGetRelationName(cstate->rel))));
1955 tupDesc = RelationGetDescr(cstate->rel);
1958 * Check to see if we can avoid writing WAL
1960 * If archive logging/streaming is not enabled *and* either
1961 * - table was created in same transaction as this COPY
1962 * - data is being written to relfilenode created in this transaction
1963 * then we can skip writing WAL. It's safe because if the transaction
1964 * doesn't commit, we'll discard the table (or the new relfilenode file).
1965 * If it does commit, we'll have done the heap_sync at the bottom of this
1968 * As mentioned in comments in utils/rel.h, the in-same-transaction test
1969 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
1970 * can be cleared before the end of the transaction. The exact case is
1971 * when a relation sets a new relfilenode twice in same transaction, yet
1972 * the second one fails in an aborted subtransaction, e.g.
1981 * However this is OK since at worst we will fail to make the optimization.
1983 * Also, if the target file is new-in-transaction, we assume that checking
1984 * FSM for free space is a waste of time, even if we must use WAL because
1985 * of archiving. This could possibly be wrong, but it's unlikely.
1987 * The comments for heap_insert and RelationGetBufferForTuple specify that
1988 * skipping WAL logging is only safe if we ensure that our tuples do not
1989 * go into pages containing tuples from any other transactions --- but this
1990 * must be the case if we have a new table or new relfilenode, so we need
1991 * no additional work to enforce that.
1994 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
1995 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1997 hi_options |= HEAP_INSERT_SKIP_FSM;
1998 if (!XLogIsNeeded())
1999 hi_options |= HEAP_INSERT_SKIP_WAL;
2002 * Optimize if new relfilenode was created in this subxact or
2003 * one of its committed children and we won't see those rows later
2004 * as part of an earlier scan or command. This ensures that if this
2005 * subtransaction aborts then the frozen rows won't be visible
2006 * after xact cleanup. Note that the stronger test of exactly
2007 * which subtransaction created it is crucial for correctness
2008 * of this optimisation.
2010 * As noted above rd_newRelfilenodeSubid is not set in all cases
2011 * where we can apply the optimization, so in those rare cases
2012 * where we cannot honour the request we do so silently.
2014 if (cstate->freeze &&
2015 ThereAreNoPriorRegisteredSnapshots() &&
2016 ThereAreNoReadyPortals() &&
2017 (cstate->rel->rd_newRelfilenodeSubid == GetCurrentSubTransactionId() ||
2018 cstate->rel->rd_createSubid == GetCurrentSubTransactionId()))
2019 hi_options |= HEAP_INSERT_FROZEN;
2023 * We need a ResultRelInfo so we can use the regular executor's
2024 * index-entry-making machinery. (There used to be a huge amount of code
2025 * here that basically duplicated execUtils.c ...)
2027 resultRelInfo = makeNode(ResultRelInfo);
2028 resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
2029 resultRelInfo->ri_RelationDesc = cstate->rel;
2030 resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
2031 if (resultRelInfo->ri_TrigDesc)
2033 resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
2034 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
2035 resultRelInfo->ri_TrigWhenExprs = (List **)
2036 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(List *));
2038 resultRelInfo->ri_TrigInstrument = NULL;
2040 ExecOpenIndices(resultRelInfo);
2042 estate->es_result_relations = resultRelInfo;
2043 estate->es_num_result_relations = 1;
2044 estate->es_result_relation_info = resultRelInfo;
2046 /* Set up a tuple slot too */
2047 myslot = ExecInitExtraTupleSlot(estate);
2048 ExecSetSlotDescriptor(myslot, tupDesc);
2049 /* Triggers might need a slot as well */
2050 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2053 * It's more efficient to prepare a bunch of tuples for insertion, and
2054 * insert them in one heap_multi_insert() call, than call heap_insert()
2055 * separately for every tuple. However, we can't do that if there are
2056 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2057 * expressions. Such triggers or expressions might query the table we're
2058 * inserting to, and act differently if the tuples that have already been
2059 * processed and prepared for insertion are not there.
2061 if ((resultRelInfo->ri_TrigDesc != NULL &&
2062 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2063 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2064 cstate->volatile_defexprs)
2066 useHeapMultiInsert = false;
2070 useHeapMultiInsert = true;
2071 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2074 /* Prepare to catch AFTER triggers. */
2075 AfterTriggerBeginQuery();
2078 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2079 * should do this for COPY, since it's not really an "INSERT" statement as
2080 * such. However, executing these triggers maintains consistency with the
2081 * EACH ROW triggers that we already fire on COPY.
2083 ExecBSInsertTriggers(estate, resultRelInfo);
2085 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2086 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2088 bistate = GetBulkInsertState();
2089 econtext = GetPerTupleExprContext(estate);
2091 /* Set up callback to identify error line number */
2092 errcallback.callback = CopyFromErrorCallback;
2093 errcallback.arg = (void *) cstate;
2094 errcallback.previous = error_context_stack;
2095 error_context_stack = &errcallback;
2099 TupleTableSlot *slot;
2101 Oid loaded_oid = InvalidOid;
2103 CHECK_FOR_INTERRUPTS();
2105 if (nBufferedTuples == 0)
2108 * Reset the per-tuple exprcontext. We can only do this if the
2109 * tuple buffer is empty (calling the context the per-tuple memory
2110 * context is a bit of a misnomer now
2112 ResetPerTupleExprContext(estate);
2115 /* Switch into its memory context */
2116 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2118 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2121 /* And now we can form the input tuple. */
2122 tuple = heap_form_tuple(tupDesc, values, nulls);
2124 if (loaded_oid != InvalidOid)
2125 HeapTupleSetOid(tuple, loaded_oid);
2127 /* Triggers and stuff need to be invoked in query context. */
2128 MemoryContextSwitchTo(oldcontext);
2130 /* Place tuple in tuple slot --- but slot shouldn't free it */
2132 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2136 /* BEFORE ROW INSERT Triggers */
2137 if (resultRelInfo->ri_TrigDesc &&
2138 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2140 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2142 if (slot == NULL) /* "do nothing" */
2144 else /* trigger might have changed tuple */
2145 tuple = ExecMaterializeSlot(slot);
2150 /* Check the constraints of the tuple */
2151 if (cstate->rel->rd_att->constr)
2152 ExecConstraints(resultRelInfo, slot, estate);
2154 if (useHeapMultiInsert)
2156 /* Add this tuple to the tuple buffer */
2157 bufferedTuples[nBufferedTuples++] = tuple;
2158 bufferedTuplesSize += tuple->t_len;
2161 * If the buffer filled up, flush it. Also flush if the total
2162 * size of all the tuples in the buffer becomes large, to
2163 * avoid using large amounts of memory for the buffers when
2164 * the tuples are exceptionally wide.
2166 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2167 bufferedTuplesSize > 65535)
2169 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2170 resultRelInfo, myslot, bistate,
2171 nBufferedTuples, bufferedTuples);
2172 nBufferedTuples = 0;
2173 bufferedTuplesSize = 0;
2178 List *recheckIndexes = NIL;
2180 /* OK, store the tuple and create index entries for it */
2181 heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
2183 if (resultRelInfo->ri_NumIndices > 0)
2184 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
2187 /* AFTER ROW INSERT Triggers */
2188 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2191 list_free(recheckIndexes);
2195 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2196 * this is the same definition used by execMain.c for counting
2197 * tuples inserted by an INSERT command.
2203 /* Flush any remaining buffered tuples */
2204 if (nBufferedTuples > 0)
2205 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2206 resultRelInfo, myslot, bistate,
2207 nBufferedTuples, bufferedTuples);
2209 /* Done, clean up */
2210 error_context_stack = errcallback.previous;
2212 FreeBulkInsertState(bistate);
2214 MemoryContextSwitchTo(oldcontext);
2216 /* Execute AFTER STATEMENT insertion triggers */
2217 ExecASInsertTriggers(estate, resultRelInfo);
2219 /* Handle queued AFTER triggers */
2220 AfterTriggerEndQuery(estate);
2225 ExecResetTupleTable(estate->es_tupleTable, false);
2227 ExecCloseIndices(resultRelInfo);
2229 FreeExecutorState(estate);
2232 * If we skipped writing WAL, then we need to sync the heap (but not
2233 * indexes since those use WAL anyway)
2235 if (hi_options & HEAP_INSERT_SKIP_WAL)
2236 heap_sync(cstate->rel);
2242 * A subroutine of CopyFrom, to write the current batch of buffered heap
2243 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2247 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2248 int hi_options, ResultRelInfo *resultRelInfo,
2249 TupleTableSlot *myslot, BulkInsertState bistate,
2250 int nBufferedTuples, HeapTuple *bufferedTuples)
2252 MemoryContext oldcontext;
2256 * heap_multi_insert leaks memory, so switch to short-lived memory context
2257 * before calling it.
2259 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2260 heap_multi_insert(cstate->rel,
2266 MemoryContextSwitchTo(oldcontext);
2269 * If there are any indexes, update them for all the inserted tuples, and
2270 * run AFTER ROW INSERT triggers.
2272 if (resultRelInfo->ri_NumIndices > 0)
2274 for (i = 0; i < nBufferedTuples; i++)
2276 List *recheckIndexes;
2278 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2280 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2282 ExecARInsertTriggers(estate, resultRelInfo,
2285 list_free(recheckIndexes);
2290 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2293 else if (resultRelInfo->ri_TrigDesc != NULL &&
2294 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
2296 for (i = 0; i < nBufferedTuples; i++)
2297 ExecARInsertTriggers(estate, resultRelInfo,
2304 * Setup to read tuples from a file for COPY FROM.
2306 * 'rel': Used as a template for the tuples
2307 * 'filename': Name of server-local file to read
2308 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2309 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2311 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2314 BeginCopyFrom(Relation rel,
2315 const char *filename,
2320 bool pipe = (filename == NULL);
2322 Form_pg_attribute *attr;
2323 AttrNumber num_phys_attrs,
2325 FmgrInfo *in_functions;
2330 ExprState **defexprs;
2331 MemoryContext oldcontext;
2332 bool volatile_defexprs;
2334 cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
2335 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2337 /* Initialize state variables */
2338 cstate->fe_eof = false;
2339 cstate->eol_type = EOL_UNKNOWN;
2340 cstate->cur_relname = RelationGetRelationName(cstate->rel);
2341 cstate->cur_lineno = 0;
2342 cstate->cur_attname = NULL;
2343 cstate->cur_attval = NULL;
2345 /* Set up variables to avoid per-attribute overhead. */
2346 initStringInfo(&cstate->attribute_buf);
2347 initStringInfo(&cstate->line_buf);
2348 cstate->line_buf_converted = false;
2349 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2350 cstate->raw_buf_index = cstate->raw_buf_len = 0;
2352 tupDesc = RelationGetDescr(cstate->rel);
2353 attr = tupDesc->attrs;
2354 num_phys_attrs = tupDesc->natts;
2356 volatile_defexprs = false;
2359 * Pick up the required catalog information for each attribute in the
2360 * relation, including the input function, the element type (to pass to
2361 * the input function), and info about defaults and constraints. (Which
2362 * input function we use depends on text/binary format choice.)
2364 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2365 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2366 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2367 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2369 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2371 /* We don't need info for dropped attributes */
2372 if (attr[attnum - 1]->attisdropped)
2375 /* Fetch the input function and typioparam info */
2377 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2378 &in_func_oid, &typioparams[attnum - 1]);
2380 getTypeInputInfo(attr[attnum - 1]->atttypid,
2381 &in_func_oid, &typioparams[attnum - 1]);
2382 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2384 /* Get default info if needed */
2385 if (!list_member_int(cstate->attnumlist, attnum))
2387 /* attribute is NOT to be copied from input */
2388 /* use default value if one exists */
2389 Node *defexpr = build_column_default(cstate->rel, attnum);
2391 if (defexpr != NULL)
2393 /* Initialize expressions in copycontext. */
2394 defexprs[num_defaults] = ExecInitExpr(
2395 expression_planner((Expr *) defexpr), NULL);
2396 defmap[num_defaults] = attnum - 1;
2399 if (!volatile_defexprs)
2400 volatile_defexprs = contain_volatile_functions(defexpr);
2405 /* We keep those variables in cstate. */
2406 cstate->in_functions = in_functions;
2407 cstate->typioparams = typioparams;
2408 cstate->defmap = defmap;
2409 cstate->defexprs = defexprs;
2410 cstate->volatile_defexprs = volatile_defexprs;
2411 cstate->num_defaults = num_defaults;
2415 if (whereToSendOutput == DestRemote)
2416 ReceiveCopyBegin(cstate);
2418 cstate->copy_file = stdin;
2424 cstate->filename = pstrdup(filename);
2425 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
2427 if (cstate->copy_file == NULL)
2429 (errcode_for_file_access(),
2430 errmsg("could not open file \"%s\" for reading: %m",
2431 cstate->filename)));
2433 fstat(fileno(cstate->copy_file), &st);
2434 if (S_ISDIR(st.st_mode))
2436 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2437 errmsg("\"%s\" is a directory", cstate->filename)));
2440 if (!cstate->binary)
2442 /* must rely on user to tell us... */
2443 cstate->file_has_oids = cstate->oids;
2447 /* Read and verify binary header */
2452 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
2453 memcmp(readSig, BinarySignature, 11) != 0)
2455 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2456 errmsg("COPY file signature not recognized")));
2458 if (!CopyGetInt32(cstate, &tmp))
2460 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2461 errmsg("invalid COPY file header (missing flags)")));
2462 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
2464 if ((tmp >> 16) != 0)
2466 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2467 errmsg("unrecognized critical flags in COPY file header")));
2468 /* Header extension length */
2469 if (!CopyGetInt32(cstate, &tmp) ||
2472 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2473 errmsg("invalid COPY file header (missing length)")));
2474 /* Skip extension header, if present */
2477 if (CopyGetData(cstate, readSig, 1, 1) != 1)
2479 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2480 errmsg("invalid COPY file header (wrong length)")));
2484 if (cstate->file_has_oids && cstate->binary)
2486 getTypeBinaryInputInfo(OIDOID,
2487 &in_func_oid, &cstate->oid_typioparam);
2488 fmgr_info(in_func_oid, &cstate->oid_in_function);
2491 /* create workspace for CopyReadAttributes results */
2492 if (!cstate->binary)
2494 AttrNumber attr_count = list_length(cstate->attnumlist);
2495 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
2497 cstate->max_fields = nfields;
2498 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
2501 MemoryContextSwitchTo(oldcontext);
2507 * Read raw fields in the next line for COPY FROM in text or csv mode.
2508 * Return false if no more lines.
2510 * An internal temporary buffer is returned via 'fields'. It is valid until
2511 * the next call of the function. Since the function returns all raw fields
2512 * in the input file, 'nfields' could be different from the number of columns
2515 * NOTE: force_not_null option are not applied to the returned fields.
2518 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
2523 /* only available for text or csv input */
2524 Assert(!cstate->binary);
2526 /* on input just throw the header line away */
2527 if (cstate->cur_lineno == 0 && cstate->header_line)
2529 cstate->cur_lineno++;
2530 if (CopyReadLine(cstate))
2531 return false; /* done */
2534 cstate->cur_lineno++;
2536 /* Actually read the line into memory here */
2537 done = CopyReadLine(cstate);
2540 * EOF at start of line means we're done. If we see EOF after some
2541 * characters, we act as though it was newline followed by EOF, ie,
2542 * process the line and then exit loop on next iteration.
2544 if (done && cstate->line_buf.len == 0)
2547 /* Parse the line into de-escaped field values */
2548 if (cstate->csv_mode)
2549 fldct = CopyReadAttributesCSV(cstate);
2551 fldct = CopyReadAttributesText(cstate);
2553 *fields = cstate->raw_fields;
2559 * Read next tuple from file for COPY FROM. Return false if no more tuples.
2561 * 'econtext' is used to evaluate default expression for each columns not
2562 * read from the file. It can be NULL when no default values are used, i.e.
2563 * when all columns are read from the file.
2565 * 'values' and 'nulls' arrays must be the same length as columns of the
2566 * relation passed to BeginCopyFrom. This function fills the arrays.
2567 * Oid of the tuple is returned with 'tupleOid' separately.
2570 NextCopyFrom(CopyState cstate, ExprContext *econtext,
2571 Datum *values, bool *nulls, Oid *tupleOid)
2574 Form_pg_attribute *attr;
2575 AttrNumber num_phys_attrs,
2577 num_defaults = cstate->num_defaults;
2578 FmgrInfo *in_functions = cstate->in_functions;
2579 Oid *typioparams = cstate->typioparams;
2583 bool file_has_oids = cstate->file_has_oids;
2584 int *defmap = cstate->defmap;
2585 ExprState **defexprs = cstate->defexprs;
2587 tupDesc = RelationGetDescr(cstate->rel);
2588 attr = tupDesc->attrs;
2589 num_phys_attrs = tupDesc->natts;
2590 attr_count = list_length(cstate->attnumlist);
2591 nfields = file_has_oids ? (attr_count + 1) : attr_count;
2593 /* Initialize all values for row to NULL */
2594 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
2595 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
2597 if (!cstate->binary)
2599 char **field_strings;
2605 /* read raw fields in the next line */
2606 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
2609 /* check for overflowing fields */
2610 if (nfields > 0 && fldct > nfields)
2612 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2613 errmsg("extra data after last expected column")));
2617 /* Read the OID field if present */
2620 if (fieldno >= fldct)
2622 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2623 errmsg("missing data for OID column")));
2624 string = field_strings[fieldno++];
2628 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2629 errmsg("null OID in COPY data")));
2630 else if (cstate->oids && tupleOid != NULL)
2632 cstate->cur_attname = "oid";
2633 cstate->cur_attval = string;
2634 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
2635 CStringGetDatum(string)));
2636 if (*tupleOid == InvalidOid)
2638 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2639 errmsg("invalid OID in COPY data")));
2640 cstate->cur_attname = NULL;
2641 cstate->cur_attval = NULL;
2645 /* Loop to read the user attributes on the line. */
2646 foreach(cur, cstate->attnumlist)
2648 int attnum = lfirst_int(cur);
2651 if (fieldno >= fldct)
2653 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2654 errmsg("missing data for column \"%s\"",
2655 NameStr(attr[m]->attname))));
2656 string = field_strings[fieldno++];
2658 if (cstate->convert_select_flags &&
2659 !cstate->convert_select_flags[m])
2661 /* ignore input field, leaving column as NULL */
2665 if (cstate->csv_mode && string == NULL &&
2666 cstate->force_notnull_flags[m])
2668 /* Go ahead and read the NULL string */
2669 string = cstate->null_print;
2672 cstate->cur_attname = NameStr(attr[m]->attname);
2673 cstate->cur_attval = string;
2674 values[m] = InputFunctionCall(&in_functions[m],
2677 attr[m]->atttypmod);
2680 cstate->cur_attname = NULL;
2681 cstate->cur_attval = NULL;
2684 Assert(fieldno == nfields);
2692 cstate->cur_lineno++;
2694 if (!CopyGetInt16(cstate, &fld_count))
2696 /* EOF detected (end of file, or protocol-level EOF) */
2700 if (fld_count == -1)
2703 * Received EOF marker. In a V3-protocol copy, wait for the
2704 * protocol-level EOF, and complain if it doesn't come
2705 * immediately. This ensures that we correctly handle CopyFail,
2706 * if client chooses to send that now.
2708 * Note that we MUST NOT try to read more data in an old-protocol
2709 * copy, since there is no protocol-level EOF marker then. We
2710 * could go either way for copy from file, but choose to throw
2711 * error if there's data after the EOF marker, for consistency
2712 * with the new-protocol case.
2716 if (cstate->copy_dest != COPY_OLD_FE &&
2717 CopyGetData(cstate, &dummy, 1, 1) > 0)
2719 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2720 errmsg("received copy data after EOF marker")));
2724 if (fld_count != attr_count)
2726 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2727 errmsg("row field count is %d, expected %d",
2728 (int) fld_count, attr_count)));
2734 cstate->cur_attname = "oid";
2736 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
2738 &cstate->oid_in_function,
2739 cstate->oid_typioparam,
2742 if (isnull || loaded_oid == InvalidOid)
2744 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2745 errmsg("invalid OID in COPY data")));
2746 cstate->cur_attname = NULL;
2747 if (cstate->oids && tupleOid != NULL)
2748 *tupleOid = loaded_oid;
2752 foreach(cur, cstate->attnumlist)
2754 int attnum = lfirst_int(cur);
2757 cstate->cur_attname = NameStr(attr[m]->attname);
2759 values[m] = CopyReadBinaryAttribute(cstate,
2765 cstate->cur_attname = NULL;
2770 * Now compute and insert any defaults available for the columns not
2771 * provided by the input data. Anything not processed here or above will
2774 for (i = 0; i < num_defaults; i++)
2777 * The caller must supply econtext and have switched into the
2778 * per-tuple memory context in it.
2780 Assert(econtext != NULL);
2781 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
2783 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
2784 &nulls[defmap[i]], NULL);
2791 * Clean up storage and release resources for COPY FROM.
2794 EndCopyFrom(CopyState cstate)
2796 /* No COPY FROM related resources except memory. */
2802 * Read the next input line and stash it in line_buf, with conversion to
2805 * Result is true if read was terminated by EOF, false if terminated
2806 * by newline. The terminating newline or EOF marker is not included
2807 * in the final value of line_buf.
2810 CopyReadLine(CopyState cstate)
2814 resetStringInfo(&cstate->line_buf);
2816 /* Mark that encoding conversion hasn't occurred yet */
2817 cstate->line_buf_converted = false;
2819 /* Parse data and transfer into line_buf */
2820 result = CopyReadLineText(cstate);
2825 * Reached EOF. In protocol version 3, we should ignore anything
2826 * after \. up to the protocol end of copy data. (XXX maybe better
2827 * not to treat \. as special?)
2829 if (cstate->copy_dest == COPY_NEW_FE)
2833 cstate->raw_buf_index = cstate->raw_buf_len;
2834 } while (CopyLoadRawBuf(cstate));
2840 * If we didn't hit EOF, then we must have transferred the EOL marker
2841 * to line_buf along with the data. Get rid of it.
2843 switch (cstate->eol_type)
2846 Assert(cstate->line_buf.len >= 1);
2847 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2848 cstate->line_buf.len--;
2849 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2852 Assert(cstate->line_buf.len >= 1);
2853 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
2854 cstate->line_buf.len--;
2855 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2858 Assert(cstate->line_buf.len >= 2);
2859 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
2860 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2861 cstate->line_buf.len -= 2;
2862 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2865 /* shouldn't get here */
2871 /* Done reading the line. Convert it to server encoding. */
2872 if (cstate->need_transcoding)
2876 cvt = pg_any_to_server(cstate->line_buf.data,
2877 cstate->line_buf.len,
2878 cstate->file_encoding);
2879 if (cvt != cstate->line_buf.data)
2881 /* transfer converted data back to line_buf */
2882 resetStringInfo(&cstate->line_buf);
2883 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
2888 /* Now it's safe to use the buffer in error messages */
2889 cstate->line_buf_converted = true;
2895 * CopyReadLineText - inner loop of CopyReadLine for text mode
2898 CopyReadLineText(CopyState cstate)
2903 bool need_data = false;
2904 bool hit_eof = false;
2905 bool result = false;
2909 bool first_char_in_line = true;
2910 bool in_quote = false,
2911 last_was_esc = false;
2913 char escapec = '\0';
2915 if (cstate->csv_mode)
2917 quotec = cstate->quote[0];
2918 escapec = cstate->escape[0];
2919 /* ignore special escape processing if it's the same as quotec */
2920 if (quotec == escapec)
2924 mblen_str[1] = '\0';
2927 * The objective of this loop is to transfer the entire next input line
2928 * into line_buf. Hence, we only care for detecting newlines (\r and/or
2929 * \n) and the end-of-copy marker (\.).
2931 * In CSV mode, \r and \n inside a quoted field are just part of the data
2932 * value and are put in line_buf. We keep just enough state to know if we
2933 * are currently in a quoted field or not.
2935 * These four characters, and the CSV escape and quote characters, are
2936 * assumed the same in frontend and backend encodings.
2938 * For speed, we try to move data from raw_buf to line_buf in chunks
2939 * rather than one character at a time. raw_buf_ptr points to the next
2940 * character to examine; any characters from raw_buf_index to raw_buf_ptr
2941 * have been determined to be part of the line, but not yet transferred to
2944 * For a little extra speed within the loop, we copy raw_buf and
2945 * raw_buf_len into local variables.
2947 copy_raw_buf = cstate->raw_buf;
2948 raw_buf_ptr = cstate->raw_buf_index;
2949 copy_buf_len = cstate->raw_buf_len;
2957 * Load more data if needed. Ideally we would just force four bytes
2958 * of read-ahead and avoid the many calls to
2959 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
2960 * does not allow us to read too far ahead or we might read into the
2961 * next data, so we read-ahead only as far we know we can. One
2962 * optimization would be to read-ahead four byte here if
2963 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
2964 * considering the size of the buffer.
2966 if (raw_buf_ptr >= copy_buf_len || need_data)
2971 * Try to read some more data. This will certainly reset
2972 * raw_buf_index to zero, and raw_buf_ptr must go with it.
2974 if (!CopyLoadRawBuf(cstate))
2977 copy_buf_len = cstate->raw_buf_len;
2980 * If we are completely out of data, break out of the loop,
2983 if (copy_buf_len <= 0)
2991 /* OK to fetch a character */
2992 prev_raw_ptr = raw_buf_ptr;
2993 c = copy_raw_buf[raw_buf_ptr++];
2995 if (cstate->csv_mode)
2998 * If character is '\\' or '\r', we may need to look ahead below.
2999 * Force fetch of the next character if we don't already have it.
3000 * We need to do this before changing CSV state, in case one of
3001 * these characters is also the quote or escape character.
3003 * Note: old-protocol does not like forced prefetch, but it's OK
3004 * here since we cannot validly be at EOF.
3006 if (c == '\\' || c == '\r')
3008 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3012 * Dealing with quotes and escapes here is mildly tricky. If the
3013 * quote char is also the escape char, there's no problem - we
3014 * just use the char as a toggle. If they are different, we need
3015 * to ensure that we only take account of an escape inside a
3016 * quoted field and immediately preceding a quote char, and not
3017 * the second in a escape-escape sequence.
3019 if (in_quote && c == escapec)
3020 last_was_esc = !last_was_esc;
3021 if (c == quotec && !last_was_esc)
3022 in_quote = !in_quote;
3024 last_was_esc = false;
3027 * Updating the line count for embedded CR and/or LF chars is
3028 * necessarily a little fragile - this test is probably about the
3029 * best we can do. (XXX it's arguable whether we should do this
3030 * at all --- is cur_lineno a physical or logical count?)
3032 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3033 cstate->cur_lineno++;
3037 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3039 /* Check for \r\n on first line, _and_ handle \r\n. */
3040 if (cstate->eol_type == EOL_UNKNOWN ||
3041 cstate->eol_type == EOL_CRNL)
3044 * If need more data, go back to loop top to load it.
3046 * Note that if we are at EOF, c will wind up as '\0' because
3047 * of the guaranteed pad of raw_buf.
3049 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3052 c = copy_raw_buf[raw_buf_ptr];
3056 raw_buf_ptr++; /* eat newline */
3057 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3061 /* found \r, but no \n */
3062 if (cstate->eol_type == EOL_CRNL)
3064 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3066 errmsg("literal carriage return found in data") :
3067 errmsg("unquoted carriage return found in data"),
3069 errhint("Use \"\\r\" to represent carriage return.") :
3070 errhint("Use quoted CSV field to represent carriage return.")));
3073 * if we got here, it is the first line and we didn't find
3074 * \n, so don't consume the peeked character
3076 cstate->eol_type = EOL_CR;
3079 else if (cstate->eol_type == EOL_NL)
3081 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3083 errmsg("literal carriage return found in data") :
3084 errmsg("unquoted carriage return found in data"),
3086 errhint("Use \"\\r\" to represent carriage return.") :
3087 errhint("Use quoted CSV field to represent carriage return.")));
3088 /* If reach here, we have found the line terminator */
3093 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3095 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3097 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3099 errmsg("literal newline found in data") :
3100 errmsg("unquoted newline found in data"),
3102 errhint("Use \"\\n\" to represent newline.") :
3103 errhint("Use quoted CSV field to represent newline.")));
3104 cstate->eol_type = EOL_NL; /* in case not set yet */
3105 /* If reach here, we have found the line terminator */
3110 * In CSV mode, we only recognize \. alone on a line. This is because
3111 * \. is a valid CSV data value.
3113 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3117 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3118 IF_NEED_REFILL_AND_EOF_BREAK(0);
3121 * get next character
3122 * Note: we do not change c so if it isn't \., we can fall
3123 * through and continue processing for file encoding.
3126 c2 = copy_raw_buf[raw_buf_ptr];
3130 raw_buf_ptr++; /* consume the '.' */
3133 * Note: if we loop back for more data here, it does not
3134 * matter that the CSV state change checks are re-executed; we
3135 * will come back here with no important state changed.
3137 if (cstate->eol_type == EOL_CRNL)
3139 /* Get the next character */
3140 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3141 /* if hit_eof, c2 will become '\0' */
3142 c2 = copy_raw_buf[raw_buf_ptr++];
3146 if (!cstate->csv_mode)
3148 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3149 errmsg("end-of-copy marker does not match previous newline style")));
3151 NO_END_OF_COPY_GOTO;
3153 else if (c2 != '\r')
3155 if (!cstate->csv_mode)
3157 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3158 errmsg("end-of-copy marker corrupt")));
3160 NO_END_OF_COPY_GOTO;
3164 /* Get the next character */
3165 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3166 /* if hit_eof, c2 will become '\0' */
3167 c2 = copy_raw_buf[raw_buf_ptr++];
3169 if (c2 != '\r' && c2 != '\n')
3171 if (!cstate->csv_mode)
3173 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3174 errmsg("end-of-copy marker corrupt")));
3176 NO_END_OF_COPY_GOTO;
3179 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3180 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3181 (cstate->eol_type == EOL_CR && c2 != '\r'))
3184 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3185 errmsg("end-of-copy marker does not match previous newline style")));
3189 * Transfer only the data before the \. into line_buf, then
3190 * discard the data and the \. sequence.
3192 if (prev_raw_ptr > cstate->raw_buf_index)
3193 appendBinaryStringInfo(&cstate->line_buf,
3194 cstate->raw_buf + cstate->raw_buf_index,
3195 prev_raw_ptr - cstate->raw_buf_index);
3196 cstate->raw_buf_index = raw_buf_ptr;
3197 result = true; /* report EOF */
3200 else if (!cstate->csv_mode)
3203 * If we are here, it means we found a backslash followed by
3204 * something other than a period. In non-CSV mode, anything
3205 * after a backslash is special, so we skip over that second
3206 * character too. If we didn't do that \\. would be
3207 * considered an eof-of copy, while in non-CSV mode it is a
3208 * literal backslash followed by a period. In CSV mode,
3209 * backslashes are not special, so we want to process the
3210 * character after the backslash just like a normal character,
3211 * so we don't increment in those cases.
3217 * This label is for CSV cases where \. appears at the start of a
3218 * line, but there is more text after it, meaning it was a data value.
3219 * We are more strict for \. in CSV mode because \. could be a data
3220 * value, while in non-CSV mode, \. cannot be a data value.
3225 * Process all bytes of a multi-byte character as a group.
3227 * We only support multi-byte sequences where the first byte has the
3228 * high-bit set, so as an optimization we can avoid this block
3229 * entirely if it is not set.
3231 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3236 /* All our encodings only read the first byte to get the length */
3237 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3238 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3239 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3240 raw_buf_ptr += mblen - 1;
3242 first_char_in_line = false;
3243 } /* end of outer loop */
3246 * Transfer any still-uncopied data to line_buf.
3254 * Return decimal value for a hexadecimal digit
3257 GetDecimalFromHex(char hex)
3259 if (isdigit((unsigned char) hex))
3262 return tolower((unsigned char) hex) - 'a' + 10;
3266 * Parse the current line into separate attributes (fields),
3267 * performing de-escaping as needed.
3269 * The input is in line_buf. We use attribute_buf to hold the result
3270 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3271 * string, or NULL when the input matches the null marker string.
3272 * This array is expanded as necessary.
3274 * (Note that the caller cannot check for nulls since the returned
3275 * string would be the post-de-escaping equivalent, which may look
3276 * the same as some valid data string.)
3278 * delim is the column delimiter string (must be just one byte for now).
3279 * null_print is the null marker string. Note that this is compared to
3280 * the pre-de-escaped input string.
3282 * The return value is the number of fields actually read.
3285 CopyReadAttributesText(CopyState cstate)
3287 char delimc = cstate->delim[0];
3294 * We need a special case for zero-column tables: check that the input
3295 * line is empty, and return.
3297 if (cstate->max_fields <= 0)
3299 if (cstate->line_buf.len != 0)
3301 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3302 errmsg("extra data after last expected column")));
3306 resetStringInfo(&cstate->attribute_buf);
3309 * The de-escaped attributes will certainly not be longer than the input
3310 * data line, so we can just force attribute_buf to be large enough and
3311 * then transfer data without any checks for enough space. We need to do
3312 * it this way because enlarging attribute_buf mid-stream would invalidate
3313 * pointers already stored into cstate->raw_fields[].
3315 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3316 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3317 output_ptr = cstate->attribute_buf.data;
3319 /* set pointer variables for loop */
3320 cur_ptr = cstate->line_buf.data;
3321 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3323 /* Outer loop iterates over fields */
3327 bool found_delim = false;
3331 bool saw_non_ascii = false;
3333 /* Make sure there is enough space for the next value */
3334 if (fieldno >= cstate->max_fields)
3336 cstate->max_fields *= 2;
3337 cstate->raw_fields =
3338 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3341 /* Remember start of field on both input and output sides */
3342 start_ptr = cur_ptr;
3343 cstate->raw_fields[fieldno] = output_ptr;
3346 * Scan data for field.
3348 * Note that in this loop, we are scanning to locate the end of field
3349 * and also speculatively performing de-escaping. Once we find the
3350 * end-of-field, we can match the raw field contents against the null
3351 * marker string. Only after that comparison fails do we know that
3352 * de-escaping is actually the right thing to do; therefore we *must
3353 * not* throw any syntax errors before we've done the null-marker
3361 if (cur_ptr >= line_end_ptr)
3371 if (cur_ptr >= line_end_ptr)
3389 if (cur_ptr < line_end_ptr)
3395 val = (val << 3) + OCTVALUE(c);
3396 if (cur_ptr < line_end_ptr)
3402 val = (val << 3) + OCTVALUE(c);
3408 if (c == '\0' || IS_HIGHBIT_SET(c))
3409 saw_non_ascii = true;
3414 if (cur_ptr < line_end_ptr)
3416 char hexchar = *cur_ptr;
3418 if (isxdigit((unsigned char) hexchar))
3420 int val = GetDecimalFromHex(hexchar);
3423 if (cur_ptr < line_end_ptr)
3426 if (isxdigit((unsigned char) hexchar))
3429 val = (val << 4) + GetDecimalFromHex(hexchar);
3433 if (c == '\0' || IS_HIGHBIT_SET(c))
3434 saw_non_ascii = true;
3458 * in all other cases, take the char after '\'
3464 /* Add c to output string */
3468 /* Check whether raw input matched null marker */
3469 input_len = end_ptr - start_ptr;
3470 if (input_len == cstate->null_print_len &&
3471 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3472 cstate->raw_fields[fieldno] = NULL;
3476 * At this point we know the field is supposed to contain data.
3478 * If we de-escaped any non-7-bit-ASCII chars, make sure the
3479 * resulting string is valid data for the db encoding.
3483 char *fld = cstate->raw_fields[fieldno];
3485 pg_verifymbstr(fld, output_ptr - fld, false);
3489 /* Terminate attribute value in output area */
3490 *output_ptr++ = '\0';
3493 /* Done if we hit EOL instead of a delim */
3498 /* Clean up state of attribute_buf */
3500 Assert(*output_ptr == '\0');
3501 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3507 * Parse the current line into separate attributes (fields),
3508 * performing de-escaping as needed. This has exactly the same API as
3509 * CopyReadAttributesText, except we parse the fields according to
3510 * "standard" (i.e. common) CSV usage.
3513 CopyReadAttributesCSV(CopyState cstate)
3515 char delimc = cstate->delim[0];
3516 char quotec = cstate->quote[0];
3517 char escapec = cstate->escape[0];
3524 * We need a special case for zero-column tables: check that the input
3525 * line is empty, and return.
3527 if (cstate->max_fields <= 0)
3529 if (cstate->line_buf.len != 0)
3531 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3532 errmsg("extra data after last expected column")));
3536 resetStringInfo(&cstate->attribute_buf);
3539 * The de-escaped attributes will certainly not be longer than the input
3540 * data line, so we can just force attribute_buf to be large enough and
3541 * then transfer data without any checks for enough space. We need to do
3542 * it this way because enlarging attribute_buf mid-stream would invalidate
3543 * pointers already stored into cstate->raw_fields[].
3545 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3546 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3547 output_ptr = cstate->attribute_buf.data;
3549 /* set pointer variables for loop */
3550 cur_ptr = cstate->line_buf.data;
3551 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3553 /* Outer loop iterates over fields */
3557 bool found_delim = false;
3558 bool saw_quote = false;
3563 /* Make sure there is enough space for the next value */
3564 if (fieldno >= cstate->max_fields)
3566 cstate->max_fields *= 2;
3567 cstate->raw_fields =
3568 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3571 /* Remember start of field on both input and output sides */
3572 start_ptr = cur_ptr;
3573 cstate->raw_fields[fieldno] = output_ptr;
3576 * Scan data for field,
3578 * The loop starts in "not quote" mode and then toggles between that
3579 * and "in quote" mode. The loop exits normally if it is in "not
3580 * quote" mode and a delimiter or line end is seen.
3590 if (cur_ptr >= line_end_ptr)
3593 /* unquoted field delimiter */
3599 /* start of quoted field (or part of field) */
3605 /* Add c to output string */
3613 if (cur_ptr >= line_end_ptr)
3615 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3616 errmsg("unterminated CSV quoted field")));
3620 /* escape within a quoted field */
3624 * peek at the next char if available, and escape it if it
3625 * is an escape char or a quote char
3627 if (cur_ptr < line_end_ptr)
3629 char nextc = *cur_ptr;
3631 if (nextc == escapec || nextc == quotec)
3633 *output_ptr++ = nextc;
3641 * end of quoted field. Must do this test after testing for
3642 * escape in case quote char and escape char are the same
3643 * (which is the common case).
3648 /* Add c to output string */
3654 /* Terminate attribute value in output area */
3655 *output_ptr++ = '\0';
3657 /* Check whether raw input matched null marker */
3658 input_len = end_ptr - start_ptr;
3659 if (!saw_quote && input_len == cstate->null_print_len &&
3660 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3661 cstate->raw_fields[fieldno] = NULL;
3664 /* Done if we hit EOL instead of a delim */
3669 /* Clean up state of attribute_buf */
3671 Assert(*output_ptr == '\0');
3672 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3679 * Read a binary attribute
3682 CopyReadBinaryAttribute(CopyState cstate,
3683 int column_no, FmgrInfo *flinfo,
3684 Oid typioparam, int32 typmod,
3690 if (!CopyGetInt32(cstate, &fld_size))
3692 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3693 errmsg("unexpected EOF in COPY data")));
3697 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
3701 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3702 errmsg("invalid field size")));
3704 /* reset attribute_buf to empty, and load raw data in it */
3705 resetStringInfo(&cstate->attribute_buf);
3707 enlargeStringInfo(&cstate->attribute_buf, fld_size);
3708 if (CopyGetData(cstate, cstate->attribute_buf.data,
3709 fld_size, fld_size) != fld_size)
3711 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3712 errmsg("unexpected EOF in COPY data")));
3714 cstate->attribute_buf.len = fld_size;
3715 cstate->attribute_buf.data[fld_size] = '\0';
3717 /* Call the column type's binary input converter */
3718 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
3719 typioparam, typmod);
3721 /* Trouble if it didn't eat the whole buffer */
3722 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
3724 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
3725 errmsg("incorrect binary data format")));
3732 * Send text representation of one attribute, with conversion and escaping
3734 #define DUMPSOFAR() \
3737 CopySendData(cstate, start, ptr - start); \
3741 CopyAttributeOutText(CopyState cstate, char *string)
3746 char delimc = cstate->delim[0];
3748 if (cstate->need_transcoding)
3749 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
3754 * We have to grovel through the string searching for control characters
3755 * and instances of the delimiter character. In most cases, though, these
3756 * are infrequent. To avoid overhead from calling CopySendData once per
3757 * character, we dump out all characters between escaped characters in a
3758 * single call. The loop invariant is that the data from "start" to "ptr"
3759 * can be sent literally, but hasn't yet been.
3761 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
3762 * in valid backend encodings, extra bytes of a multibyte character never
3763 * look like ASCII. This loop is sufficiently performance-critical that
3764 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
3765 * of the normal safe-encoding path.
3767 if (cstate->encoding_embeds_ascii)
3770 while ((c = *ptr) != '\0')
3772 if ((unsigned char) c < (unsigned char) 0x20)
3775 * \r and \n must be escaped, the others are traditional. We
3776 * prefer to dump these using the C-like notation, rather than
3777 * a backslash and the literal character, because it makes the
3778 * dump file a bit more proof against Microsoftish data
3802 /* If it's the delimiter, must backslash it */
3805 /* All ASCII control chars are length 1 */
3807 continue; /* fall to end of loop */
3809 /* if we get here, we need to convert the control char */
3811 CopySendChar(cstate, '\\');
3812 CopySendChar(cstate, c);
3813 start = ++ptr; /* do not include char in next run */
3815 else if (c == '\\' || c == delimc)
3818 CopySendChar(cstate, '\\');
3819 start = ptr++; /* we include char in next run */
3821 else if (IS_HIGHBIT_SET(c))
3822 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
3830 while ((c = *ptr) != '\0')
3832 if ((unsigned char) c < (unsigned char) 0x20)
3835 * \r and \n must be escaped, the others are traditional. We
3836 * prefer to dump these using the C-like notation, rather than
3837 * a backslash and the literal character, because it makes the
3838 * dump file a bit more proof against Microsoftish data
3862 /* If it's the delimiter, must backslash it */
3865 /* All ASCII control chars are length 1 */
3867 continue; /* fall to end of loop */
3869 /* if we get here, we need to convert the control char */
3871 CopySendChar(cstate, '\\');
3872 CopySendChar(cstate, c);
3873 start = ++ptr; /* do not include char in next run */
3875 else if (c == '\\' || c == delimc)
3878 CopySendChar(cstate, '\\');
3879 start = ptr++; /* we include char in next run */
3890 * Send text representation of one attribute, with conversion and
3891 * CSV-style escaping
3894 CopyAttributeOutCSV(CopyState cstate, char *string,
3895 bool use_quote, bool single_attr)
3900 char delimc = cstate->delim[0];
3901 char quotec = cstate->quote[0];
3902 char escapec = cstate->escape[0];
3904 /* force quoting if it matches null_print (before conversion!) */
3905 if (!use_quote && strcmp(string, cstate->null_print) == 0)
3908 if (cstate->need_transcoding)
3909 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
3914 * Make a preliminary pass to discover if it needs quoting
3919 * Because '\.' can be a data value, quote it if it appears alone on a
3920 * line so it is not interpreted as the end-of-data marker.
3922 if (single_attr && strcmp(ptr, "\\.") == 0)
3928 while ((c = *tptr) != '\0')
3930 if (c == delimc || c == quotec || c == '\n' || c == '\r')
3935 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3936 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
3945 CopySendChar(cstate, quotec);
3948 * We adopt the same optimization strategy as in CopyAttributeOutText
3951 while ((c = *ptr) != '\0')
3953 if (c == quotec || c == escapec)
3956 CopySendChar(cstate, escapec);
3957 start = ptr; /* we include char in next run */
3959 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3960 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
3966 CopySendChar(cstate, quotec);
3970 /* If it doesn't need quoting, we can just dump it as-is */
3971 CopySendString(cstate, ptr);
3976 * CopyGetAttnums - build an integer list of attnums to be copied
3978 * The input attnamelist is either the user-specified column list,
3979 * or NIL if there was none (in which case we want all the non-dropped
3982 * rel can be NULL ... it's only used for error reports.
3985 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
3987 List *attnums = NIL;
3989 if (attnamelist == NIL)
3991 /* Generate default column list */
3992 Form_pg_attribute *attr = tupDesc->attrs;
3993 int attr_count = tupDesc->natts;
3996 for (i = 0; i < attr_count; i++)
3998 if (attr[i]->attisdropped)
4000 attnums = lappend_int(attnums, i + 1);
4005 /* Validate the user-supplied list and extract attnums */
4008 foreach(l, attnamelist)
4010 char *name = strVal(lfirst(l));
4014 /* Lookup column name */
4015 attnum = InvalidAttrNumber;
4016 for (i = 0; i < tupDesc->natts; i++)
4018 if (tupDesc->attrs[i]->attisdropped)
4020 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4022 attnum = tupDesc->attrs[i]->attnum;
4026 if (attnum == InvalidAttrNumber)
4030 (errcode(ERRCODE_UNDEFINED_COLUMN),
4031 errmsg("column \"%s\" of relation \"%s\" does not exist",
4032 name, RelationGetRelationName(rel))));
4035 (errcode(ERRCODE_UNDEFINED_COLUMN),
4036 errmsg("column \"%s\" does not exist",
4039 /* Check for duplicates */
4040 if (list_member_int(attnums, attnum))
4042 (errcode(ERRCODE_DUPLICATE_COLUMN),
4043 errmsg("column \"%s\" specified more than once",
4045 attnums = lappend_int(attnums, attnum);
4054 * copy_dest_startup --- executor startup
4057 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4063 * copy_dest_receive --- receive one tuple
4066 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4068 DR_copy *myState = (DR_copy *) self;
4069 CopyState cstate = myState->cstate;
4071 /* Make sure the tuple is fully deconstructed */
4072 slot_getallattrs(slot);
4074 /* And send the data */
4075 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4076 myState->processed++;
4080 * copy_dest_shutdown --- executor end
4083 copy_dest_shutdown(DestReceiver *self)
4089 * copy_dest_destroy --- release DestReceiver object
4092 copy_dest_destroy(DestReceiver *self)
4098 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4101 CreateCopyDestReceiver(void)
4103 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4105 self->pub.receiveSlot = copy_dest_receive;
4106 self->pub.rStartup = copy_dest_startup;
4107 self->pub.rShutdown = copy_dest_shutdown;
4108 self->pub.rDestroy = copy_dest_destroy;
4109 self->pub.mydest = DestCopyOut;
4111 self->cstate = NULL; /* will be set later */
4112 self->processed = 0;
4114 return (DestReceiver *) self;