1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/commands/copy.c
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "rewrite/rewriteHandler.h"
41 #include "storage/fd.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/portal.h"
47 #include "utils/rel.h"
48 #include "utils/rls.h"
49 #include "utils/snapmgr.h"
52 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
53 #define OCTVALUE(c) ((c) - '0')
56 * Represents the different source/dest cases we need to worry about at
61 COPY_FILE, /* to/from file (or a piped program) */
62 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
63 COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
64 COPY_CALLBACK /* to/from callback function */
68 * Represents the end-of-line terminator type of the input
79 * This struct contains all the state variables used throughout a COPY
80 * operation. For simplicity, we use the same struct for all variants of COPY,
81 * even though some fields are used in only some cases.
83 * Multi-byte encodings: all supported client-side encodings encode multi-byte
84 * characters by having the first byte's high bit set. Subsequent bytes of the
85 * character can have the high bit not set. When scanning data in such an
86 * encoding to look for a match to a single-byte (ie ASCII) character, we must
87 * use the full pg_encoding_mblen() machinery to skip over multibyte
88 * characters, else we might find a false match to a trailing byte. In
89 * supported server encodings, there is no possibility of a false match, and
90 * it's faster to make useless comparisons to trailing bytes than it is to
91 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
92 * when we have to do it the hard way.
94 typedef struct CopyStateData
96 /* low-level state data */
97 CopyDest copy_dest; /* type of copy source/destination */
98 FILE *copy_file; /* used if copy_dest == COPY_FILE */
99 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
100 * dest == COPY_NEW_FE in COPY FROM */
101 bool fe_eof; /* true if detected end of copy data */
102 EolType eol_type; /* EOL type of input */
103 int file_encoding; /* file or remote side's character encoding */
104 bool need_transcoding; /* file encoding diff from server? */
105 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
107 /* parameters from the COPY command */
108 Relation rel; /* relation to copy to or from */
109 QueryDesc *queryDesc; /* executable query to copy from */
110 List *attnumlist; /* integer list of attnums to copy */
111 char *filename; /* filename, or NULL for STDIN/STDOUT */
112 bool is_program; /* is 'filename' a program to popen? */
113 copy_data_source_cb data_source_cb; /* function for reading data*/
114 bool binary; /* binary format? */
115 bool oids; /* include OIDs? */
116 bool freeze; /* freeze rows on loading? */
117 bool csv_mode; /* Comma Separated Value format? */
118 bool header_line; /* CSV header line? */
119 char *null_print; /* NULL marker string (server encoding!) */
120 int null_print_len; /* length of same */
121 char *null_print_client; /* same converted to file encoding */
122 char *delim; /* column delimiter (must be 1 byte) */
123 char *quote; /* CSV quote char (must be 1 byte) */
124 char *escape; /* CSV escape char (must be 1 byte) */
125 List *force_quote; /* list of column names */
126 bool force_quote_all; /* FORCE_QUOTE *? */
127 bool *force_quote_flags; /* per-column CSV FQ flags */
128 List *force_notnull; /* list of column names */
129 bool *force_notnull_flags; /* per-column CSV FNN flags */
130 List *force_null; /* list of column names */
131 bool *force_null_flags; /* per-column CSV FN flags */
132 bool convert_selectively; /* do selective binary conversion? */
133 List *convert_select; /* list of column names (can be NIL) */
134 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
136 /* these are just for error messages, see CopyFromErrorCallback */
137 const char *cur_relname; /* table name for error messages */
138 int cur_lineno; /* line number for error messages */
139 const char *cur_attname; /* current att for error messages */
140 const char *cur_attval; /* current att value for error messages */
143 * Working state for COPY TO/FROM
145 MemoryContext copycontext; /* per-copy execution context */
148 * Working state for COPY TO
150 FmgrInfo *out_functions; /* lookup info for output functions */
151 MemoryContext rowcontext; /* per-row evaluation context */
154 * Working state for COPY FROM
156 AttrNumber num_defaults;
158 FmgrInfo oid_in_function;
160 FmgrInfo *in_functions; /* array of input functions for each attrs */
161 Oid *typioparams; /* array of element types for in_functions */
162 int *defmap; /* array of default att numbers */
163 ExprState **defexprs; /* array of default att expressions */
164 bool volatile_defexprs; /* is any of defexprs volatile? */
167 PartitionDispatch *partition_dispatch_info;
168 int num_dispatch; /* Number of entries in the above array */
169 int num_partitions; /* Number of members in the following arrays */
170 ResultRelInfo *partitions; /* Per partition result relation */
171 TupleConversionMap **partition_tupconv_maps;
172 TupleTableSlot *partition_tuple_slot;
175 * These variables are used to reduce overhead in textual COPY FROM.
177 * attribute_buf holds the separated, de-escaped text for each field of
178 * the current line. The CopyReadAttributes functions return arrays of
179 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
180 * the buffer on each cycle.
182 StringInfoData attribute_buf;
184 /* field raw data pointers found by COPY FROM */
190 * Similarly, line_buf holds the whole input line being processed. The
191 * input cycle is first to read the whole line into line_buf, convert it
192 * to server encoding there, and then extract the individual attribute
193 * fields into attribute_buf. line_buf is preserved unmodified so that we
194 * can display it in error messages if appropriate.
196 StringInfoData line_buf;
197 bool line_buf_converted; /* converted to server encoding? */
198 bool line_buf_valid; /* contains the row being processed? */
201 * Finally, raw_buf holds raw data read from the data source (file or
202 * client connection). CopyReadLine parses this data sufficiently to
203 * locate line boundaries, then transfers the data to line_buf and
204 * converts it. Note: we guarantee that there is a \0 at
205 * raw_buf[raw_buf_len].
207 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
209 int raw_buf_index; /* next byte to process */
210 int raw_buf_len; /* total # of bytes stored */
213 /* DestReceiver for COPY (query) TO */
216 DestReceiver pub; /* publicly-known function pointers */
217 CopyState cstate; /* CopyStateData for the command */
218 uint64 processed; /* # of tuples processed */
223 * These macros centralize code used to process line_buf and raw_buf buffers.
224 * They are macros because they often do continue/break control and to avoid
225 * function call overhead in tight COPY loops.
227 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
228 * prevent the continue/break processing from working. We end the "if (1)"
229 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
230 * any "else" in the calling code, and to avoid any compiler warnings about
231 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
235 * This keeps the character read at the top of the loop in the buffer
236 * even if there is more than one read-ahead.
238 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
241 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
243 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
249 /* This consumes the remainder of the buffer and breaks */
250 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
253 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
256 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
257 /* backslash just before EOF, treat as data char */ \
264 * Transfer any approved data to line_buf; must do this to be sure
265 * there is some room in raw_buf.
267 #define REFILL_LINEBUF \
270 if (raw_buf_ptr > cstate->raw_buf_index) \
272 appendBinaryStringInfo(&cstate->line_buf, \
273 cstate->raw_buf + cstate->raw_buf_index, \
274 raw_buf_ptr - cstate->raw_buf_index); \
275 cstate->raw_buf_index = raw_buf_ptr; \
279 /* Undo any read-ahead and jump out of the block. */
280 #define NO_END_OF_COPY_GOTO \
283 raw_buf_ptr = prev_raw_ptr + 1; \
284 goto not_end_of_copy; \
287 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
290 /* non-export function prototypes */
291 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
292 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
294 static void EndCopy(CopyState cstate);
295 static void ClosePipeToProgram(CopyState cstate);
296 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
297 Oid queryRelId, const char *filename, bool is_program,
298 List *attnamelist, List *options);
299 static void EndCopyTo(CopyState cstate);
300 static uint64 DoCopyTo(CopyState cstate);
301 static uint64 CopyTo(CopyState cstate);
302 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
303 Datum *values, bool *nulls);
304 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
305 CommandId mycid, int hi_options,
306 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
307 BulkInsertState bistate,
308 int nBufferedTuples, HeapTuple *bufferedTuples,
309 int firstBufferedLineNo);
310 static bool CopyReadLine(CopyState cstate);
311 static bool CopyReadLineText(CopyState cstate);
312 static int CopyReadAttributesText(CopyState cstate);
313 static int CopyReadAttributesCSV(CopyState cstate);
314 static Datum CopyReadBinaryAttribute(CopyState cstate,
315 int column_no, FmgrInfo *flinfo,
316 Oid typioparam, int32 typmod,
318 static void CopyAttributeOutText(CopyState cstate, char *string);
319 static void CopyAttributeOutCSV(CopyState cstate, char *string,
320 bool use_quote, bool single_attr);
321 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
323 static char *limit_printout_length(const char *str);
325 /* Low-level communications functions */
326 static void SendCopyBegin(CopyState cstate);
327 static void ReceiveCopyBegin(CopyState cstate);
328 static void SendCopyEnd(CopyState cstate);
329 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
330 static void CopySendString(CopyState cstate, const char *str);
331 static void CopySendChar(CopyState cstate, char c);
332 static void CopySendEndOfRow(CopyState cstate);
333 static int CopyGetData(CopyState cstate, void *databuf,
334 int minread, int maxread);
335 static void CopySendInt32(CopyState cstate, int32 val);
336 static bool CopyGetInt32(CopyState cstate, int32 *val);
337 static void CopySendInt16(CopyState cstate, int16 val);
338 static bool CopyGetInt16(CopyState cstate, int16 *val);
342 * Send copy start/stop messages for frontend copies. These have changed
343 * in past protocol redesigns.
346 SendCopyBegin(CopyState cstate)
348 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
352 int natts = list_length(cstate->attnumlist);
353 int16 format = (cstate->binary ? 1 : 0);
356 pq_beginmessage(&buf, 'H');
357 pq_sendbyte(&buf, format); /* overall format */
358 pq_sendint(&buf, natts, 2);
359 for (i = 0; i < natts; i++)
360 pq_sendint(&buf, format, 2); /* per-column formats */
362 cstate->copy_dest = COPY_NEW_FE;
369 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
370 errmsg("COPY BINARY is not supported to stdout or from stdin")));
371 pq_putemptymessage('H');
372 /* grottiness needed for old COPY OUT protocol */
374 cstate->copy_dest = COPY_OLD_FE;
379 ReceiveCopyBegin(CopyState cstate)
381 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
385 int natts = list_length(cstate->attnumlist);
386 int16 format = (cstate->binary ? 1 : 0);
389 pq_beginmessage(&buf, 'G');
390 pq_sendbyte(&buf, format); /* overall format */
391 pq_sendint(&buf, natts, 2);
392 for (i = 0; i < natts; i++)
393 pq_sendint(&buf, format, 2); /* per-column formats */
395 cstate->copy_dest = COPY_NEW_FE;
396 cstate->fe_msgbuf = makeLongStringInfo();
403 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
404 errmsg("COPY BINARY is not supported to stdout or from stdin")));
405 pq_putemptymessage('G');
406 /* any error in old protocol will make us lose sync */
408 cstate->copy_dest = COPY_OLD_FE;
410 /* We *must* flush here to ensure FE knows it can send. */
415 SendCopyEnd(CopyState cstate)
417 if (cstate->copy_dest == COPY_NEW_FE)
419 /* Shouldn't have any unsent data */
420 Assert(cstate->fe_msgbuf->len == 0);
421 /* Send Copy Done message */
422 pq_putemptymessage('c');
426 CopySendData(cstate, "\\.", 2);
427 /* Need to flush out the trailer (this also appends a newline) */
428 CopySendEndOfRow(cstate);
429 pq_endcopyout(false);
434 * CopySendData sends output data to the destination (file or frontend)
435 * CopySendString does the same for null-terminated strings
436 * CopySendChar does the same for single characters
437 * CopySendEndOfRow does the appropriate thing at end of each data row
438 * (data is not actually flushed except by CopySendEndOfRow)
440 * NB: no data conversion is applied by these functions
444 CopySendData(CopyState cstate, const void *databuf, int datasize)
446 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
450 CopySendString(CopyState cstate, const char *str)
452 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
456 CopySendChar(CopyState cstate, char c)
458 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
462 CopySendEndOfRow(CopyState cstate)
464 StringInfo fe_msgbuf = cstate->fe_msgbuf;
466 switch (cstate->copy_dest)
471 /* Default line termination depends on platform */
473 CopySendChar(cstate, '\n');
475 CopySendString(cstate, "\r\n");
479 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
480 cstate->copy_file) != 1 ||
481 ferror(cstate->copy_file))
483 if (cstate->is_program)
488 * The pipe will be closed automatically on error at
489 * the end of transaction, but we might get a better
490 * error message from the subprocess' exit code than
493 ClosePipeToProgram(cstate);
496 * If ClosePipeToProgram() didn't throw an error, the
497 * program terminated normally, but closed the pipe
498 * first. Restore errno, and throw an error.
503 (errcode_for_file_access(),
504 errmsg("could not write to COPY program: %m")));
508 (errcode_for_file_access(),
509 errmsg("could not write to COPY file: %m")));
513 /* The FE/BE protocol uses \n as newline for all platforms */
515 CopySendChar(cstate, '\n');
517 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
519 /* no hope of recovering connection sync, so FATAL */
521 (errcode(ERRCODE_CONNECTION_FAILURE),
522 errmsg("connection lost during COPY to stdout")));
526 /* The FE/BE protocol uses \n as newline for all platforms */
528 CopySendChar(cstate, '\n');
530 /* Dump the accumulated row as one CopyData message */
531 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
534 Assert(false); /* Not yet supported. */
538 resetStringInfo(fe_msgbuf);
542 * CopyGetData reads data from the source (file or frontend)
544 * We attempt to read at least minread, and at most maxread, bytes from
545 * the source. The actual number of bytes read is returned; if this is
546 * less than minread, EOF was detected.
548 * Note: when copying from the frontend, we expect a proper EOF mark per
549 * protocol; if the frontend simply drops the connection, we raise error.
550 * It seems unwise to allow the COPY IN to complete normally in that case.
552 * NB: no data conversion is applied here.
555 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
559 switch (cstate->copy_dest)
562 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
563 if (ferror(cstate->copy_file))
565 (errcode_for_file_access(),
566 errmsg("could not read from COPY file: %m")));
571 * We cannot read more than minread bytes (which in practice is 1)
572 * because old protocol doesn't have any clear way of separating
573 * the COPY stream from following data. This is slow, but not any
574 * slower than the code path was originally, and we don't care
575 * much anymore about the performance of old protocol.
577 if (pq_getbytes((char *) databuf, minread))
579 /* Only a \. terminator is legal EOF in old protocol */
581 (errcode(ERRCODE_CONNECTION_FAILURE),
582 errmsg("unexpected EOF on client connection with an open transaction")));
587 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
591 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
593 /* Try to receive another message */
597 HOLD_CANCEL_INTERRUPTS();
599 mtype = pq_getbyte();
602 (errcode(ERRCODE_CONNECTION_FAILURE),
603 errmsg("unexpected EOF on client connection with an open transaction")));
604 if (pq_getmessage(cstate->fe_msgbuf, 0))
606 (errcode(ERRCODE_CONNECTION_FAILURE),
607 errmsg("unexpected EOF on client connection with an open transaction")));
608 RESUME_CANCEL_INTERRUPTS();
611 case 'd': /* CopyData */
613 case 'c': /* CopyDone */
614 /* COPY IN correctly terminated by frontend */
615 cstate->fe_eof = true;
617 case 'f': /* CopyFail */
619 (errcode(ERRCODE_QUERY_CANCELED),
620 errmsg("COPY from stdin failed: %s",
621 pq_getmsgstring(cstate->fe_msgbuf))));
623 case 'H': /* Flush */
627 * Ignore Flush/Sync for the convenience of client
628 * libraries (such as libpq) that may send those
629 * without noticing that the command they just
635 (errcode(ERRCODE_PROTOCOL_VIOLATION),
636 errmsg("unexpected message type 0x%02X during COPY from stdin",
641 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
644 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
645 databuf = (void *) ((char *) databuf + avail);
651 bytesread = cstate->data_source_cb(databuf, minread, maxread);
660 * These functions do apply some data conversion
664 * CopySendInt32 sends an int32 in network byte order
667 CopySendInt32(CopyState cstate, int32 val)
671 buf = htonl((uint32) val);
672 CopySendData(cstate, &buf, sizeof(buf));
676 * CopyGetInt32 reads an int32 that appears in network byte order
678 * Returns true if OK, false if EOF
681 CopyGetInt32(CopyState cstate, int32 *val)
685 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
687 *val = 0; /* suppress compiler warning */
690 *val = (int32) ntohl(buf);
695 * CopySendInt16 sends an int16 in network byte order
698 CopySendInt16(CopyState cstate, int16 val)
702 buf = htons((uint16) val);
703 CopySendData(cstate, &buf, sizeof(buf));
707 * CopyGetInt16 reads an int16 that appears in network byte order
710 CopyGetInt16(CopyState cstate, int16 *val)
714 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
716 *val = 0; /* suppress compiler warning */
719 *val = (int16) ntohs(buf);
725 * CopyLoadRawBuf loads some more data into raw_buf
727 * Returns TRUE if able to obtain at least one more byte, else FALSE.
729 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
730 * down to the start of the buffer and then we load more data after that.
731 * This case is used only when a frontend multibyte character crosses a
732 * bufferload boundary.
735 CopyLoadRawBuf(CopyState cstate)
740 if (cstate->raw_buf_index < cstate->raw_buf_len)
742 /* Copy down the unprocessed data */
743 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
744 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
748 nbytes = 0; /* no data need be saved */
750 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
751 1, RAW_BUF_SIZE - nbytes);
753 cstate->raw_buf[nbytes] = '\0';
754 cstate->raw_buf_index = 0;
755 cstate->raw_buf_len = nbytes;
756 return (inbytes > 0);
761 * DoCopy executes the SQL COPY statement
763 * Either unload or reload contents of table <relation>, depending on <from>.
764 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
765 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
768 * If <pipe> is false, transfer is between the table and the file named
769 * <filename>. Otherwise, transfer is between the table and our regular
770 * input/output stream. The latter could be either stdin/stdout or a
771 * socket, depending on whether we're running under Postmaster control.
773 * Do not allow a Postgres user without superuser privilege to read from
774 * or write to a file.
776 * Do not allow the copy if user doesn't have proper permission to access
777 * the table or the specifically requested columns.
780 DoCopy(ParseState *pstate, const CopyStmt *stmt,
781 int stmt_location, int stmt_len,
785 bool is_from = stmt->is_from;
786 bool pipe = (stmt->filename == NULL);
789 RawStmt *query = NULL;
790 List *range_table = NIL;
792 /* Disallow COPY to/from file or program except to superusers. */
793 if (!pipe && !superuser())
795 if (stmt->is_program)
797 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
798 errmsg("must be superuser to COPY to or from an external program"),
799 errhint("Anyone can COPY to stdout or from stdin. "
800 "psql's \\copy command also works for anyone.")));
803 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
804 errmsg("must be superuser to COPY to or from a file"),
805 errhint("Anyone can COPY to stdout or from stdin. "
806 "psql's \\copy command also works for anyone.")));
812 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
817 Assert(!stmt->query);
819 /* Open and lock the relation, using the appropriate lock type. */
820 rel = heap_openrv(stmt->relation,
821 (is_from ? RowExclusiveLock : AccessShareLock));
823 relid = RelationGetRelid(rel);
825 rte = makeNode(RangeTblEntry);
826 rte->rtekind = RTE_RELATION;
827 rte->relid = RelationGetRelid(rel);
828 rte->relkind = rel->rd_rel->relkind;
829 rte->requiredPerms = required_access;
830 range_table = list_make1(rte);
832 tupDesc = RelationGetDescr(rel);
833 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
834 foreach(cur, attnums)
836 int attno = lfirst_int(cur) -
837 FirstLowInvalidHeapAttributeNumber;
840 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
842 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
844 ExecCheckRTPerms(range_table, true);
847 * Permission check for row security policies.
849 * check_enable_rls will ereport(ERROR) if the user has requested
850 * something invalid and will otherwise indicate if we should enable
851 * RLS (returns RLS_ENABLED) or not for this COPY statement.
853 * If the relation has a row security policy and we are to apply it
854 * then perform a "query" copy and allow the normal query processing
855 * to handle the policies.
857 * If RLS is not enabled for this, then just fall through to the
858 * normal non-filtering relation handling.
860 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
866 List *targetList = NIL;
870 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
871 errmsg("COPY FROM not supported with row-level security"),
872 errhint("Use INSERT statements instead.")));
877 * If no columns are specified in the attribute list of the COPY
878 * command, then the target list is 'all' columns. Therefore, '*'
879 * should be used as the target list for the resulting SELECT
882 * In the case that columns are specified in the attribute list,
883 * create a ColumnRef and ResTarget for each column and add them
884 * to the target list for the resulting SELECT statement.
888 cr = makeNode(ColumnRef);
889 cr->fields = list_make1(makeNode(A_Star));
892 target = makeNode(ResTarget);
894 target->indirection = NIL;
895 target->val = (Node *) cr;
896 target->location = -1;
898 targetList = list_make1(target);
904 foreach(lc, stmt->attlist)
907 * Build the ColumnRef for each column. The ColumnRef
908 * 'fields' property is a String 'Value' node (see
909 * nodes/value.h) that corresponds to the column name
912 cr = makeNode(ColumnRef);
913 cr->fields = list_make1(lfirst(lc));
916 /* Build the ResTarget and add the ColumnRef to it. */
917 target = makeNode(ResTarget);
919 target->indirection = NIL;
920 target->val = (Node *) cr;
921 target->location = -1;
923 /* Add each column to the SELECT statement's target list */
924 targetList = lappend(targetList, target);
929 * Build RangeVar for from clause, fully qualified based on the
930 * relation which we have opened and locked.
932 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
933 pstrdup(RelationGetRelationName(rel)),
937 select = makeNode(SelectStmt);
938 select->targetList = targetList;
939 select->fromClause = list_make1(from);
941 query = makeNode(RawStmt);
942 query->stmt = (Node *) select;
943 query->stmt_location = stmt_location;
944 query->stmt_len = stmt_len;
947 * Close the relation for now, but keep the lock on it to prevent
948 * changes between now and when we start the query-based COPY.
950 * We'll reopen it later as part of the query-based COPY.
952 heap_close(rel, NoLock);
960 query = makeNode(RawStmt);
961 query->stmt = stmt->query;
962 query->stmt_location = stmt_location;
963 query->stmt_len = stmt_len;
973 /* check read-only transaction and parallel mode */
974 if (XactReadOnly && !rel->rd_islocaltemp)
975 PreventCommandIfReadOnly("COPY FROM");
976 PreventCommandIfParallelMode("COPY FROM");
978 cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
979 NULL, stmt->attlist, stmt->options);
980 cstate->range_table = range_table;
981 *processed = CopyFrom(cstate); /* copy from file to database */
986 cstate = BeginCopyTo(pstate, rel, query, relid,
987 stmt->filename, stmt->is_program,
988 stmt->attlist, stmt->options);
989 *processed = DoCopyTo(cstate); /* copy from database to file */
994 * Close the relation. If reading, we can release the AccessShareLock we
995 * got; if writing, we should hold the lock until end of transaction to
996 * ensure that updates will be committed before lock is released.
999 heap_close(rel, (is_from ? NoLock : AccessShareLock));
1003 * Process the statement option list for COPY.
1005 * Scan the options list (a list of DefElem) and transpose the information
1006 * into cstate, applying appropriate error checking.
1008 * cstate is assumed to be filled with zeroes initially.
1010 * This is exported so that external users of the COPY API can sanity-check
1011 * a list of options. In that usage, cstate should be passed as NULL
1012 * (since external users don't know sizeof(CopyStateData)) and the collected
1013 * data is just leaked until CurrentMemoryContext is reset.
1015 * Note that additional checking, such as whether column names listed in FORCE
1016 * QUOTE actually exist, has to be applied later. This just checks for
1017 * self-consistency of the options list.
1020 ProcessCopyOptions(ParseState *pstate,
1025 bool format_specified = false;
1028 /* Support external use for option sanity checking */
1030 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1032 cstate->file_encoding = -1;
1034 /* Extract options from the statement node tree */
1035 foreach(option, options)
1037 DefElem *defel = castNode(DefElem, lfirst(option));
1039 if (strcmp(defel->defname, "format") == 0)
1041 char *fmt = defGetString(defel);
1043 if (format_specified)
1045 (errcode(ERRCODE_SYNTAX_ERROR),
1046 errmsg("conflicting or redundant options"),
1047 parser_errposition(pstate, defel->location)));
1048 format_specified = true;
1049 if (strcmp(fmt, "text") == 0)
1050 /* default format */ ;
1051 else if (strcmp(fmt, "csv") == 0)
1052 cstate->csv_mode = true;
1053 else if (strcmp(fmt, "binary") == 0)
1054 cstate->binary = true;
1057 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1058 errmsg("COPY format \"%s\" not recognized", fmt),
1059 parser_errposition(pstate, defel->location)));
1061 else if (strcmp(defel->defname, "oids") == 0)
1065 (errcode(ERRCODE_SYNTAX_ERROR),
1066 errmsg("conflicting or redundant options"),
1067 parser_errposition(pstate, defel->location)));
1068 cstate->oids = defGetBoolean(defel);
1070 else if (strcmp(defel->defname, "freeze") == 0)
1074 (errcode(ERRCODE_SYNTAX_ERROR),
1075 errmsg("conflicting or redundant options"),
1076 parser_errposition(pstate, defel->location)));
1077 cstate->freeze = defGetBoolean(defel);
1079 else if (strcmp(defel->defname, "delimiter") == 0)
1083 (errcode(ERRCODE_SYNTAX_ERROR),
1084 errmsg("conflicting or redundant options"),
1085 parser_errposition(pstate, defel->location)));
1086 cstate->delim = defGetString(defel);
1088 else if (strcmp(defel->defname, "null") == 0)
1090 if (cstate->null_print)
1092 (errcode(ERRCODE_SYNTAX_ERROR),
1093 errmsg("conflicting or redundant options"),
1094 parser_errposition(pstate, defel->location)));
1095 cstate->null_print = defGetString(defel);
1097 else if (strcmp(defel->defname, "header") == 0)
1099 if (cstate->header_line)
1101 (errcode(ERRCODE_SYNTAX_ERROR),
1102 errmsg("conflicting or redundant options"),
1103 parser_errposition(pstate, defel->location)));
1104 cstate->header_line = defGetBoolean(defel);
1106 else if (strcmp(defel->defname, "quote") == 0)
1110 (errcode(ERRCODE_SYNTAX_ERROR),
1111 errmsg("conflicting or redundant options"),
1112 parser_errposition(pstate, defel->location)));
1113 cstate->quote = defGetString(defel);
1115 else if (strcmp(defel->defname, "escape") == 0)
1119 (errcode(ERRCODE_SYNTAX_ERROR),
1120 errmsg("conflicting or redundant options"),
1121 parser_errposition(pstate, defel->location)));
1122 cstate->escape = defGetString(defel);
1124 else if (strcmp(defel->defname, "force_quote") == 0)
1126 if (cstate->force_quote || cstate->force_quote_all)
1128 (errcode(ERRCODE_SYNTAX_ERROR),
1129 errmsg("conflicting or redundant options"),
1130 parser_errposition(pstate, defel->location)));
1131 if (defel->arg && IsA(defel->arg, A_Star))
1132 cstate->force_quote_all = true;
1133 else if (defel->arg && IsA(defel->arg, List))
1134 cstate->force_quote = castNode(List, defel->arg);
1137 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1138 errmsg("argument to option \"%s\" must be a list of column names",
1140 parser_errposition(pstate, defel->location)));
1142 else if (strcmp(defel->defname, "force_not_null") == 0)
1144 if (cstate->force_notnull)
1146 (errcode(ERRCODE_SYNTAX_ERROR),
1147 errmsg("conflicting or redundant options"),
1148 parser_errposition(pstate, defel->location)));
1149 if (defel->arg && IsA(defel->arg, List))
1150 cstate->force_notnull = castNode(List, defel->arg);
1153 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1154 errmsg("argument to option \"%s\" must be a list of column names",
1156 parser_errposition(pstate, defel->location)));
1158 else if (strcmp(defel->defname, "force_null") == 0)
1160 if (cstate->force_null)
1162 (errcode(ERRCODE_SYNTAX_ERROR),
1163 errmsg("conflicting or redundant options")));
1164 if (defel->arg && IsA(defel->arg, List))
1165 cstate->force_null = castNode(List, defel->arg);
1168 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1169 errmsg("argument to option \"%s\" must be a list of column names",
1171 parser_errposition(pstate, defel->location)));
1173 else if (strcmp(defel->defname, "convert_selectively") == 0)
1176 * Undocumented, not-accessible-from-SQL option: convert only the
1177 * named columns to binary form, storing the rest as NULLs. It's
1178 * allowed for the column list to be NIL.
1180 if (cstate->convert_selectively)
1182 (errcode(ERRCODE_SYNTAX_ERROR),
1183 errmsg("conflicting or redundant options"),
1184 parser_errposition(pstate, defel->location)));
1185 cstate->convert_selectively = true;
1186 if (defel->arg == NULL || IsA(defel->arg, List))
1187 cstate->convert_select = castNode(List, defel->arg);
1190 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191 errmsg("argument to option \"%s\" must be a list of column names",
1193 parser_errposition(pstate, defel->location)));
1195 else if (strcmp(defel->defname, "encoding") == 0)
1197 if (cstate->file_encoding >= 0)
1199 (errcode(ERRCODE_SYNTAX_ERROR),
1200 errmsg("conflicting or redundant options"),
1201 parser_errposition(pstate, defel->location)));
1202 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1203 if (cstate->file_encoding < 0)
1205 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1206 errmsg("argument to option \"%s\" must be a valid encoding name",
1208 parser_errposition(pstate, defel->location)));
1212 (errcode(ERRCODE_SYNTAX_ERROR),
1213 errmsg("option \"%s\" not recognized",
1215 parser_errposition(pstate, defel->location)));
1219 * Check for incompatible options (must do these two before inserting
1222 if (cstate->binary && cstate->delim)
1224 (errcode(ERRCODE_SYNTAX_ERROR),
1225 errmsg("cannot specify DELIMITER in BINARY mode")));
1227 if (cstate->binary && cstate->null_print)
1229 (errcode(ERRCODE_SYNTAX_ERROR),
1230 errmsg("cannot specify NULL in BINARY mode")));
1232 /* Set defaults for omitted options */
1234 cstate->delim = cstate->csv_mode ? "," : "\t";
1236 if (!cstate->null_print)
1237 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1238 cstate->null_print_len = strlen(cstate->null_print);
1240 if (cstate->csv_mode)
1243 cstate->quote = "\"";
1244 if (!cstate->escape)
1245 cstate->escape = cstate->quote;
1248 /* Only single-byte delimiter strings are supported. */
1249 if (strlen(cstate->delim) != 1)
1251 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1252 errmsg("COPY delimiter must be a single one-byte character")));
1254 /* Disallow end-of-line characters */
1255 if (strchr(cstate->delim, '\r') != NULL ||
1256 strchr(cstate->delim, '\n') != NULL)
1258 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1259 errmsg("COPY delimiter cannot be newline or carriage return")));
1261 if (strchr(cstate->null_print, '\r') != NULL ||
1262 strchr(cstate->null_print, '\n') != NULL)
1264 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1265 errmsg("COPY null representation cannot use newline or carriage return")));
1268 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1269 * backslash because it would be ambiguous. We can't allow the other
1270 * cases because data characters matching the delimiter must be
1271 * backslashed, and certain backslash combinations are interpreted
1272 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1273 * more than strictly necessary, but seems best for consistency and
1274 * future-proofing. Likewise we disallow all digits though only octal
1275 * digits are actually dangerous.
1277 if (!cstate->csv_mode &&
1278 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1279 cstate->delim[0]) != NULL)
1281 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1282 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1285 if (!cstate->csv_mode && cstate->header_line)
1287 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1288 errmsg("COPY HEADER available only in CSV mode")));
1291 if (!cstate->csv_mode && cstate->quote != NULL)
1293 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1294 errmsg("COPY quote available only in CSV mode")));
1296 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1298 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1299 errmsg("COPY quote must be a single one-byte character")));
1301 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1303 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1304 errmsg("COPY delimiter and quote must be different")));
1307 if (!cstate->csv_mode && cstate->escape != NULL)
1309 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1310 errmsg("COPY escape available only in CSV mode")));
1312 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1314 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1315 errmsg("COPY escape must be a single one-byte character")));
1317 /* Check force_quote */
1318 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1320 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1321 errmsg("COPY force quote available only in CSV mode")));
1322 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1324 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1325 errmsg("COPY force quote only available using COPY TO")));
1327 /* Check force_notnull */
1328 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1330 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1331 errmsg("COPY force not null available only in CSV mode")));
1332 if (cstate->force_notnull != NIL && !is_from)
1334 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1335 errmsg("COPY force not null only available using COPY FROM")));
1337 /* Check force_null */
1338 if (!cstate->csv_mode && cstate->force_null != NIL)
1340 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1341 errmsg("COPY force null available only in CSV mode")));
1343 if (cstate->force_null != NIL && !is_from)
1345 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1346 errmsg("COPY force null only available using COPY FROM")));
1348 /* Don't allow the delimiter to appear in the null string. */
1349 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1351 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1352 errmsg("COPY delimiter must not appear in the NULL specification")));
1354 /* Don't allow the CSV quote char to appear in the null string. */
1355 if (cstate->csv_mode &&
1356 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1358 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1359 errmsg("CSV quote character must not appear in the NULL specification")));
1363 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1365 * Iff <binary>, unload or reload in the binary format, as opposed to the
1366 * more wasteful but more robust and portable text format.
1368 * Iff <oids>, unload or reload the format that includes OID information.
1369 * On input, we accept OIDs whether or not the table has an OID column,
1370 * but silently drop them if it does not. On output, we report an error
1371 * if the user asks for OIDs in a table that has none (not providing an
1372 * OID column might seem friendlier, but could seriously confuse programs).
1374 * If in the text format, delimit columns with delimiter <delim> and print
1375 * NULL values as <null_print>.
1378 BeginCopy(ParseState *pstate,
1389 MemoryContext oldcontext;
1391 /* Allocate workspace and zero all fields */
1392 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1395 * We allocate everything used by a cstate in a new memory context. This
1396 * avoids memory leaks during repeated use of COPY in a query.
1398 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1400 ALLOCSET_DEFAULT_SIZES);
1402 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1404 /* Extract options from the statement node tree */
1405 ProcessCopyOptions(pstate, cstate, is_from, options);
1407 /* Process the source/target relation or query */
1414 tupDesc = RelationGetDescr(cstate->rel);
1416 /* Don't allow COPY w/ OIDs to or from a table without them */
1417 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1419 (errcode(ERRCODE_UNDEFINED_COLUMN),
1420 errmsg("table \"%s\" does not have OIDs",
1421 RelationGetRelationName(cstate->rel))));
1423 /* Initialize state for CopyFrom tuple routing. */
1424 if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1426 PartitionDispatch *partition_dispatch_info;
1427 ResultRelInfo *partitions;
1428 TupleConversionMap **partition_tupconv_maps;
1429 TupleTableSlot *partition_tuple_slot;
1433 ExecSetupPartitionTupleRouting(rel,
1434 &partition_dispatch_info,
1436 &partition_tupconv_maps,
1437 &partition_tuple_slot,
1438 &num_parted, &num_partitions);
1439 cstate->partition_dispatch_info = partition_dispatch_info;
1440 cstate->num_dispatch = num_parted;
1441 cstate->partitions = partitions;
1442 cstate->num_partitions = num_partitions;
1443 cstate->partition_tupconv_maps = partition_tupconv_maps;
1444 cstate->partition_tuple_slot = partition_tuple_slot;
1457 /* Don't allow COPY w/ OIDs from a query */
1460 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1461 errmsg("COPY (query) WITH OIDS is not supported")));
1464 * Run parse analysis and rewrite. Note this also acquires sufficient
1465 * locks on the source table(s).
1467 * Because the parser and planner tend to scribble on their input, we
1468 * make a preliminary copy of the source querytree. This prevents
1469 * problems in the case that the COPY is in a portal or plpgsql
1470 * function and is executed repeatedly. (See also the same hack in
1471 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1473 rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1474 pstate->p_sourcetext, NULL, 0,
1477 /* check that we got back something we can work with */
1478 if (rewritten == NIL)
1481 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1482 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1484 else if (list_length(rewritten) > 1)
1488 /* examine queries to determine which error message to issue */
1489 foreach(lc, rewritten)
1491 Query *q = castNode(Query, lfirst(lc));
1493 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1495 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1496 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1497 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1499 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1500 errmsg("DO ALSO rules are not supported for the COPY")));
1504 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1505 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1508 query = castNode(Query, linitial(rewritten));
1510 /* The grammar allows SELECT INTO, but we don't support that */
1511 if (query->utilityStmt != NULL &&
1512 IsA(query->utilityStmt, CreateTableAsStmt))
1514 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1515 errmsg("COPY (SELECT INTO) is not supported")));
1517 Assert(query->utilityStmt == NULL);
1520 * Similarly the grammar doesn't enforce the presence of a RETURNING
1521 * clause, but this is required here.
1523 if (query->commandType != CMD_SELECT &&
1524 query->returningList == NIL)
1526 Assert(query->commandType == CMD_INSERT ||
1527 query->commandType == CMD_UPDATE ||
1528 query->commandType == CMD_DELETE);
1531 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1532 errmsg("COPY query must have a RETURNING clause")));
1535 /* plan the query */
1536 plan = pg_plan_query(query, 0, NULL);
1539 * With row level security and a user using "COPY relation TO", we
1540 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1541 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1542 * in any RLS clauses.
1544 * When this happens, we are passed in the relid of the originally
1545 * found relation (which we have locked). As the planner will look up
1546 * the relation again, we double-check here to make sure it found the
1547 * same one that we have locked.
1549 if (queryRelId != InvalidOid)
1552 * Note that with RLS involved there may be multiple relations,
1553 * and while the one we need is almost certainly first, we don't
1554 * make any guarantees of that in the planner, so check the whole
1555 * list and make sure we find the original relation.
1557 if (!list_member_oid(plan->relationOids, queryRelId))
1559 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1560 errmsg("relation referenced by COPY statement has changed")));
1564 * Use a snapshot with an updated command ID to ensure this query sees
1565 * results of any previously executed queries.
1567 PushCopiedSnapshot(GetActiveSnapshot());
1568 UpdateActiveSnapshotCommandId();
1570 /* Create dest receiver for COPY OUT */
1571 dest = CreateDestReceiver(DestCopyOut);
1572 ((DR_copy *) dest)->cstate = cstate;
1574 /* Create a QueryDesc requesting no output */
1575 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1576 GetActiveSnapshot(),
1578 dest, NULL, NULL, 0);
1581 * Call ExecutorStart to prepare the plan for execution.
1583 * ExecutorStart computes a result tupdesc for us
1585 ExecutorStart(cstate->queryDesc, 0);
1587 tupDesc = cstate->queryDesc->tupDesc;
1590 /* Generate or convert list of attributes to process */
1591 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1593 num_phys_attrs = tupDesc->natts;
1595 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1596 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1597 if (cstate->force_quote_all)
1601 for (i = 0; i < num_phys_attrs; i++)
1602 cstate->force_quote_flags[i] = true;
1604 else if (cstate->force_quote)
1609 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1611 foreach(cur, attnums)
1613 int attnum = lfirst_int(cur);
1615 if (!list_member_int(cstate->attnumlist, attnum))
1617 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1618 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1619 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1620 cstate->force_quote_flags[attnum - 1] = true;
1624 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1625 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1626 if (cstate->force_notnull)
1631 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1633 foreach(cur, attnums)
1635 int attnum = lfirst_int(cur);
1637 if (!list_member_int(cstate->attnumlist, attnum))
1639 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1640 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1641 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1642 cstate->force_notnull_flags[attnum - 1] = true;
1646 /* Convert FORCE_NULL name list to per-column flags, check validity */
1647 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1648 if (cstate->force_null)
1653 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1655 foreach(cur, attnums)
1657 int attnum = lfirst_int(cur);
1659 if (!list_member_int(cstate->attnumlist, attnum))
1661 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1662 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1663 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1664 cstate->force_null_flags[attnum - 1] = true;
1668 /* Convert convert_selectively name list to per-column flags */
1669 if (cstate->convert_selectively)
1674 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1676 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1678 foreach(cur, attnums)
1680 int attnum = lfirst_int(cur);
1682 if (!list_member_int(cstate->attnumlist, attnum))
1684 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1685 errmsg_internal("selected column \"%s\" not referenced by COPY",
1686 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1687 cstate->convert_select_flags[attnum - 1] = true;
1691 /* Use client encoding when ENCODING option is not specified. */
1692 if (cstate->file_encoding < 0)
1693 cstate->file_encoding = pg_get_client_encoding();
1696 * Set up encoding conversion info. Even if the file and server encodings
1697 * are the same, we must apply pg_any_to_server() to validate data in
1698 * multibyte encodings.
1700 cstate->need_transcoding =
1701 (cstate->file_encoding != GetDatabaseEncoding() ||
1702 pg_database_encoding_max_length() > 1);
1703 /* See Multibyte encoding comment above */
1704 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1706 cstate->copy_dest = COPY_FILE; /* default */
1708 MemoryContextSwitchTo(oldcontext);
1714 * Closes the pipe to an external program, checking the pclose() return code.
1717 ClosePipeToProgram(CopyState cstate)
1721 Assert(cstate->is_program);
1723 pclose_rc = ClosePipeStream(cstate->copy_file);
1724 if (pclose_rc == -1)
1726 (errcode_for_file_access(),
1727 errmsg("could not close pipe to external command: %m")));
1728 else if (pclose_rc != 0)
1730 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1731 errmsg("program \"%s\" failed",
1733 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1737 * Release resources allocated in a cstate for COPY TO/FROM.
1740 EndCopy(CopyState cstate)
1742 if (cstate->is_program)
1744 ClosePipeToProgram(cstate);
1748 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1750 (errcode_for_file_access(),
1751 errmsg("could not close file \"%s\": %m",
1752 cstate->filename)));
1755 MemoryContextDelete(cstate->copycontext);
1760 * Setup CopyState to read tuples from a table or a query for COPY TO.
1763 BeginCopyTo(ParseState *pstate,
1767 const char *filename,
1773 bool pipe = (filename == NULL);
1774 MemoryContext oldcontext;
1776 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1778 if (rel->rd_rel->relkind == RELKIND_VIEW)
1780 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1781 errmsg("cannot copy from view \"%s\"",
1782 RelationGetRelationName(rel)),
1783 errhint("Try the COPY (SELECT ...) TO variant.")));
1784 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1786 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1787 errmsg("cannot copy from materialized view \"%s\"",
1788 RelationGetRelationName(rel)),
1789 errhint("Try the COPY (SELECT ...) TO variant.")));
1790 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1792 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1793 errmsg("cannot copy from foreign table \"%s\"",
1794 RelationGetRelationName(rel)),
1795 errhint("Try the COPY (SELECT ...) TO variant.")));
1796 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1798 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1799 errmsg("cannot copy from sequence \"%s\"",
1800 RelationGetRelationName(rel))));
1801 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1803 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1804 errmsg("cannot copy from partitioned table \"%s\"",
1805 RelationGetRelationName(rel)),
1806 errhint("Try the COPY (SELECT ...) TO variant.")));
1809 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1810 errmsg("cannot copy from non-table relation \"%s\"",
1811 RelationGetRelationName(rel))));
1814 cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1816 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1820 Assert(!is_program); /* the grammar does not allow this */
1821 if (whereToSendOutput != DestRemote)
1822 cstate->copy_file = stdout;
1826 cstate->filename = pstrdup(filename);
1827 cstate->is_program = is_program;
1831 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1832 if (cstate->copy_file == NULL)
1834 (errcode_for_file_access(),
1835 errmsg("could not execute command \"%s\": %m",
1836 cstate->filename)));
1840 mode_t oumask; /* Pre-existing umask value */
1844 * Prevent write to relative path ... too easy to shoot oneself in
1845 * the foot by overwriting a database file ...
1847 if (!is_absolute_path(filename))
1849 (errcode(ERRCODE_INVALID_NAME),
1850 errmsg("relative path not allowed for COPY to file")));
1852 oumask = umask(S_IWGRP | S_IWOTH);
1853 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1855 if (cstate->copy_file == NULL)
1857 /* copy errno because ereport subfunctions might change it */
1858 int save_errno = errno;
1861 (errcode_for_file_access(),
1862 errmsg("could not open file \"%s\" for writing: %m",
1864 (save_errno == ENOENT || save_errno == EACCES) ?
1865 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1866 "You may want a client-side facility such as psql's \\copy.") : 0));
1869 if (fstat(fileno(cstate->copy_file), &st))
1871 (errcode_for_file_access(),
1872 errmsg("could not stat file \"%s\": %m",
1873 cstate->filename)));
1875 if (S_ISDIR(st.st_mode))
1877 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1878 errmsg("\"%s\" is a directory", cstate->filename)));
1882 MemoryContextSwitchTo(oldcontext);
1888 * This intermediate routine exists mainly to localize the effects of setjmp
1889 * so we don't need to plaster a lot of variables with "volatile".
1892 DoCopyTo(CopyState cstate)
1894 bool pipe = (cstate->filename == NULL);
1895 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1901 SendCopyBegin(cstate);
1903 processed = CopyTo(cstate);
1906 SendCopyEnd(cstate);
1911 * Make sure we turn off old-style COPY OUT mode upon error. It is
1912 * okay to do this in all cases, since it does nothing if the mode is
1915 pq_endcopyout(true);
1924 * Clean up storage and release resources for COPY TO.
1927 EndCopyTo(CopyState cstate)
1929 if (cstate->queryDesc != NULL)
1931 /* Close down the query and free resources. */
1932 ExecutorFinish(cstate->queryDesc);
1933 ExecutorEnd(cstate->queryDesc);
1934 FreeQueryDesc(cstate->queryDesc);
1935 PopActiveSnapshot();
1938 /* Clean up storage */
1943 * Copy from relation or query TO file.
1946 CopyTo(CopyState cstate)
1950 Form_pg_attribute *attr;
1955 tupDesc = RelationGetDescr(cstate->rel);
1957 tupDesc = cstate->queryDesc->tupDesc;
1958 attr = tupDesc->attrs;
1959 num_phys_attrs = tupDesc->natts;
1960 cstate->null_print_client = cstate->null_print; /* default */
1962 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1963 cstate->fe_msgbuf = makeLongStringInfo();
1965 /* Get info about the columns we need to process. */
1966 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1967 foreach(cur, cstate->attnumlist)
1969 int attnum = lfirst_int(cur);
1974 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1978 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1981 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1985 * Create a temporary memory context that we can reset once per row to
1986 * recover palloc'd memory. This avoids any problems with leaks inside
1987 * datatype output routines, and should be faster than retail pfree's
1988 * anyway. (We don't need a whole econtext as CopyFrom does.)
1990 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1992 ALLOCSET_DEFAULT_SIZES);
1996 /* Generate header for a binary copy */
2000 CopySendData(cstate, BinarySignature, 11);
2005 CopySendInt32(cstate, tmp);
2006 /* No header extension */
2008 CopySendInt32(cstate, tmp);
2013 * For non-binary copy, we need to convert null_print to file
2014 * encoding, because it will be sent directly with CopySendString.
2016 if (cstate->need_transcoding)
2017 cstate->null_print_client = pg_server_to_any(cstate->null_print,
2018 cstate->null_print_len,
2019 cstate->file_encoding);
2021 /* if a header has been requested send the line */
2022 if (cstate->header_line)
2024 bool hdr_delim = false;
2026 foreach(cur, cstate->attnumlist)
2028 int attnum = lfirst_int(cur);
2032 CopySendChar(cstate, cstate->delim[0]);
2035 colname = NameStr(attr[attnum - 1]->attname);
2037 CopyAttributeOutCSV(cstate, colname, false,
2038 list_length(cstate->attnumlist) == 1);
2041 CopySendEndOfRow(cstate);
2049 HeapScanDesc scandesc;
2052 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2053 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2055 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2058 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2060 CHECK_FOR_INTERRUPTS();
2062 /* Deconstruct the tuple ... faster than repeated heap_getattr */
2063 heap_deform_tuple(tuple, tupDesc, values, nulls);
2065 /* Format and send the data */
2066 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2070 heap_endscan(scandesc);
2077 /* run the plan --- the dest receiver will send tuples */
2078 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2079 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2084 /* Generate trailer for a binary copy */
2085 CopySendInt16(cstate, -1);
2086 /* Need to flush out the trailer */
2087 CopySendEndOfRow(cstate);
2090 MemoryContextDelete(cstate->rowcontext);
2096 * Emit one row during CopyTo().
2099 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2101 bool need_delim = false;
2102 FmgrInfo *out_functions = cstate->out_functions;
2103 MemoryContext oldcontext;
2107 MemoryContextReset(cstate->rowcontext);
2108 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2112 /* Binary per-tuple header */
2113 CopySendInt16(cstate, list_length(cstate->attnumlist));
2114 /* Send OID if wanted --- note attnumlist doesn't include it */
2117 /* Hack --- assume Oid is same size as int32 */
2118 CopySendInt32(cstate, sizeof(int32));
2119 CopySendInt32(cstate, tupleOid);
2124 /* Text format has no per-tuple header, but send OID if wanted */
2125 /* Assume digits don't need any quoting or encoding conversion */
2128 string = DatumGetCString(DirectFunctionCall1(oidout,
2129 ObjectIdGetDatum(tupleOid)));
2130 CopySendString(cstate, string);
2135 foreach(cur, cstate->attnumlist)
2137 int attnum = lfirst_int(cur);
2138 Datum value = values[attnum - 1];
2139 bool isnull = nulls[attnum - 1];
2141 if (!cstate->binary)
2144 CopySendChar(cstate, cstate->delim[0]);
2150 if (!cstate->binary)
2151 CopySendString(cstate, cstate->null_print_client);
2153 CopySendInt32(cstate, -1);
2157 if (!cstate->binary)
2159 string = OutputFunctionCall(&out_functions[attnum - 1],
2161 if (cstate->csv_mode)
2162 CopyAttributeOutCSV(cstate, string,
2163 cstate->force_quote_flags[attnum - 1],
2164 list_length(cstate->attnumlist) == 1);
2166 CopyAttributeOutText(cstate, string);
2172 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2174 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2175 CopySendData(cstate, VARDATA(outputbytes),
2176 VARSIZE(outputbytes) - VARHDRSZ);
2181 CopySendEndOfRow(cstate);
2183 MemoryContextSwitchTo(oldcontext);
2188 * error context callback for COPY FROM
2190 * The argument for the error context must be CopyState.
2193 CopyFromErrorCallback(void *arg)
2195 CopyState cstate = (CopyState) arg;
2199 /* can't usefully display the data */
2200 if (cstate->cur_attname)
2201 errcontext("COPY %s, line %d, column %s",
2202 cstate->cur_relname, cstate->cur_lineno,
2203 cstate->cur_attname);
2205 errcontext("COPY %s, line %d",
2206 cstate->cur_relname, cstate->cur_lineno);
2210 if (cstate->cur_attname && cstate->cur_attval)
2212 /* error is relevant to a particular column */
2215 attval = limit_printout_length(cstate->cur_attval);
2216 errcontext("COPY %s, line %d, column %s: \"%s\"",
2217 cstate->cur_relname, cstate->cur_lineno,
2218 cstate->cur_attname, attval);
2221 else if (cstate->cur_attname)
2223 /* error is relevant to a particular column, value is NULL */
2224 errcontext("COPY %s, line %d, column %s: null input",
2225 cstate->cur_relname, cstate->cur_lineno,
2226 cstate->cur_attname);
2231 * Error is relevant to a particular line.
2233 * If line_buf still contains the correct line, and it's already
2234 * transcoded, print it. If it's still in a foreign encoding, it's
2235 * quite likely that the error is precisely a failure to do
2236 * encoding conversion (ie, bad data). We dare not try to convert
2237 * it, and at present there's no way to regurgitate it without
2238 * conversion. So we have to punt and just report the line number.
2240 if (cstate->line_buf_valid &&
2241 (cstate->line_buf_converted || !cstate->need_transcoding))
2245 lineval = limit_printout_length(cstate->line_buf.data);
2246 errcontext("COPY %s, line %d: \"%s\"",
2247 cstate->cur_relname, cstate->cur_lineno, lineval);
2252 errcontext("COPY %s, line %d",
2253 cstate->cur_relname, cstate->cur_lineno);
2260 * Make sure we don't print an unreasonable amount of COPY data in a message.
2262 * It would seem a lot easier to just use the sprintf "precision" limit to
2263 * truncate the string. However, some versions of glibc have a bug/misfeature
2264 * that vsnprintf will always fail (return -1) if it is asked to truncate
2265 * a string that contains invalid byte sequences for the current encoding.
2266 * So, do our own truncation. We return a pstrdup'd copy of the input.
2269 limit_printout_length(const char *str)
2271 #define MAX_COPY_DATA_DISPLAY 100
2273 int slen = strlen(str);
2277 /* Fast path if definitely okay */
2278 if (slen <= MAX_COPY_DATA_DISPLAY)
2279 return pstrdup(str);
2281 /* Apply encoding-dependent truncation */
2282 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2285 * Truncate, and add "..." to show we truncated the input.
2287 res = (char *) palloc(len + 4);
2288 memcpy(res, str, len);
2289 strcpy(res + len, "...");
2295 * Copy FROM file to relation.
2298 CopyFrom(CopyState cstate)
2304 ResultRelInfo *resultRelInfo;
2305 ResultRelInfo *saved_resultRelInfo = NULL;
2306 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2307 ExprContext *econtext;
2308 TupleTableSlot *myslot;
2309 MemoryContext oldcontext = CurrentMemoryContext;
2311 ErrorContextCallback errcallback;
2312 CommandId mycid = GetCurrentCommandId(true);
2313 int hi_options = 0; /* start with default heap_insert options */
2314 BulkInsertState bistate;
2315 uint64 processed = 0;
2316 bool useHeapMultiInsert;
2317 int nBufferedTuples = 0;
2318 int prev_leaf_part_index = -1;
2320 #define MAX_BUFFERED_TUPLES 1000
2321 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2322 Size bufferedTuplesSize = 0;
2323 int firstBufferedLineNo = 0;
2325 Assert(cstate->rel);
2328 * The target must be a plain relation or have an INSTEAD OF INSERT row
2329 * trigger. (Currently, such triggers are only allowed on views, so we
2330 * only hint about them in the view case.)
2332 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2333 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2334 !(cstate->rel->trigdesc &&
2335 cstate->rel->trigdesc->trig_insert_instead_row))
2337 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2339 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2340 errmsg("cannot copy to view \"%s\"",
2341 RelationGetRelationName(cstate->rel)),
2342 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2343 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2345 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2346 errmsg("cannot copy to materialized view \"%s\"",
2347 RelationGetRelationName(cstate->rel))));
2348 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2350 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2351 errmsg("cannot copy to foreign table \"%s\"",
2352 RelationGetRelationName(cstate->rel))));
2353 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2355 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2356 errmsg("cannot copy to sequence \"%s\"",
2357 RelationGetRelationName(cstate->rel))));
2360 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2361 errmsg("cannot copy to non-table relation \"%s\"",
2362 RelationGetRelationName(cstate->rel))));
2365 tupDesc = RelationGetDescr(cstate->rel);
2368 * Check to see if we can avoid writing WAL
2370 * If archive logging/streaming is not enabled *and* either
2371 * - table was created in same transaction as this COPY
2372 * - data is being written to relfilenode created in this transaction
2373 * then we can skip writing WAL. It's safe because if the transaction
2374 * doesn't commit, we'll discard the table (or the new relfilenode file).
2375 * If it does commit, we'll have done the heap_sync at the bottom of this
2378 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2379 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2380 * can be cleared before the end of the transaction. The exact case is
2381 * when a relation sets a new relfilenode twice in same transaction, yet
2382 * the second one fails in an aborted subtransaction, e.g.
2391 * Also, if the target file is new-in-transaction, we assume that checking
2392 * FSM for free space is a waste of time, even if we must use WAL because
2393 * of archiving. This could possibly be wrong, but it's unlikely.
2395 * The comments for heap_insert and RelationGetBufferForTuple specify that
2396 * skipping WAL logging is only safe if we ensure that our tuples do not
2397 * go into pages containing tuples from any other transactions --- but this
2398 * must be the case if we have a new table or new relfilenode, so we need
2399 * no additional work to enforce that.
2402 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2403 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2404 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2406 hi_options |= HEAP_INSERT_SKIP_FSM;
2407 if (!XLogIsNeeded())
2408 hi_options |= HEAP_INSERT_SKIP_WAL;
2412 * Optimize if new relfilenode was created in this subxact or one of its
2413 * committed children and we won't see those rows later as part of an
2414 * earlier scan or command. This ensures that if this subtransaction
2415 * aborts then the frozen rows won't be visible after xact cleanup. Note
2416 * that the stronger test of exactly which subtransaction created it is
2417 * crucial for correctness of this optimization.
2421 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2423 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2424 errmsg("cannot perform FREEZE because of prior transaction activity")));
2426 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2427 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2429 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2430 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2432 hi_options |= HEAP_INSERT_FROZEN;
2436 * We need a ResultRelInfo so we can use the regular executor's
2437 * index-entry-making machinery. (There used to be a huge amount of code
2438 * here that basically duplicated execUtils.c ...)
2440 resultRelInfo = makeNode(ResultRelInfo);
2441 InitResultRelInfo(resultRelInfo,
2443 1, /* dummy rangetable index */
2447 ExecOpenIndices(resultRelInfo, false);
2449 estate->es_result_relations = resultRelInfo;
2450 estate->es_num_result_relations = 1;
2451 estate->es_result_relation_info = resultRelInfo;
2452 estate->es_range_table = cstate->range_table;
2454 /* Set up a tuple slot too */
2455 myslot = ExecInitExtraTupleSlot(estate);
2456 ExecSetSlotDescriptor(myslot, tupDesc);
2457 /* Triggers might need a slot as well */
2458 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2461 * It's more efficient to prepare a bunch of tuples for insertion, and
2462 * insert them in one heap_multi_insert() call, than call heap_insert()
2463 * separately for every tuple. However, we can't do that if there are
2464 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2465 * expressions. Such triggers or expressions might query the table we're
2466 * inserting to, and act differently if the tuples that have already been
2467 * processed and prepared for insertion are not there. We also can't do
2468 * it if the table is partitioned.
2470 if ((resultRelInfo->ri_TrigDesc != NULL &&
2471 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2472 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2473 cstate->partition_dispatch_info != NULL ||
2474 cstate->volatile_defexprs)
2476 useHeapMultiInsert = false;
2480 useHeapMultiInsert = true;
2481 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2484 /* Prepare to catch AFTER triggers. */
2485 AfterTriggerBeginQuery();
2488 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2489 * should do this for COPY, since it's not really an "INSERT" statement as
2490 * such. However, executing these triggers maintains consistency with the
2491 * EACH ROW triggers that we already fire on COPY.
2493 ExecBSInsertTriggers(estate, resultRelInfo);
2495 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2496 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2498 bistate = GetBulkInsertState();
2499 econtext = GetPerTupleExprContext(estate);
2501 /* Set up callback to identify error line number */
2502 errcallback.callback = CopyFromErrorCallback;
2503 errcallback.arg = (void *) cstate;
2504 errcallback.previous = error_context_stack;
2505 error_context_stack = &errcallback;
2509 TupleTableSlot *slot;
2511 Oid loaded_oid = InvalidOid;
2513 CHECK_FOR_INTERRUPTS();
2515 if (nBufferedTuples == 0)
2518 * Reset the per-tuple exprcontext. We can only do this if the
2519 * tuple buffer is empty. (Calling the context the per-tuple
2520 * memory context is a bit of a misnomer now.)
2522 ResetPerTupleExprContext(estate);
2525 /* Switch into its memory context */
2526 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2528 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2531 /* And now we can form the input tuple. */
2532 tuple = heap_form_tuple(tupDesc, values, nulls);
2534 if (loaded_oid != InvalidOid)
2535 HeapTupleSetOid(tuple, loaded_oid);
2538 * Constraints might reference the tableoid column, so initialize
2539 * t_tableOid before evaluating them.
2541 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2543 /* Triggers and stuff need to be invoked in query context. */
2544 MemoryContextSwitchTo(oldcontext);
2546 /* Place tuple in tuple slot --- but slot shouldn't free it */
2548 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2550 /* Determine the partition to heap_insert the tuple into */
2551 if (cstate->partition_dispatch_info)
2553 int leaf_part_index;
2554 TupleConversionMap *map;
2557 * Away we go ... If we end up not finding a partition after all,
2558 * ExecFindPartition() does not return and errors out instead.
2559 * Otherwise, the returned value is to be used as an index into
2560 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2561 * will get us the ResultRelInfo and TupleConversionMap for the
2562 * partition, respectively.
2564 leaf_part_index = ExecFindPartition(resultRelInfo,
2565 cstate->partition_dispatch_info,
2568 Assert(leaf_part_index >= 0 &&
2569 leaf_part_index < cstate->num_partitions);
2572 * If this tuple is mapped to a partition that is not same as the
2573 * previous one, we'd better make the bulk insert mechanism gets a
2576 if (prev_leaf_part_index != leaf_part_index)
2578 ReleaseBulkInsertStatePin(bistate);
2579 prev_leaf_part_index = leaf_part_index;
2583 * Save the old ResultRelInfo and switch to the one corresponding
2584 * to the selected partition.
2586 saved_resultRelInfo = resultRelInfo;
2587 resultRelInfo = cstate->partitions + leaf_part_index;
2589 /* We do not yet have a way to insert into a foreign partition */
2590 if (resultRelInfo->ri_FdwRoutine)
2592 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2593 errmsg("cannot route inserted tuples to a foreign table")));
2596 * For ExecInsertIndexTuples() to work on the partition's indexes
2598 estate->es_result_relation_info = resultRelInfo;
2601 * We might need to convert from the parent rowtype to the
2602 * partition rowtype.
2604 map = cstate->partition_tupconv_maps[leaf_part_index];
2607 Relation partrel = resultRelInfo->ri_RelationDesc;
2609 tuple = do_convert_tuple(tuple, map);
2612 * We must use the partition's tuple descriptor from this
2613 * point on. Use a dedicated slot from this point on until
2614 * we're finished dealing with the partition.
2616 slot = cstate->partition_tuple_slot;
2617 Assert(slot != NULL);
2618 ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2619 ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2622 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2627 /* BEFORE ROW INSERT Triggers */
2628 if (resultRelInfo->ri_TrigDesc &&
2629 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2631 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2633 if (slot == NULL) /* "do nothing" */
2635 else /* trigger might have changed tuple */
2636 tuple = ExecMaterializeSlot(slot);
2641 if (resultRelInfo->ri_TrigDesc &&
2642 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2644 /* Pass the data to the INSTEAD ROW INSERT trigger */
2645 ExecIRInsertTriggers(estate, resultRelInfo, slot);
2649 /* Check the constraints of the tuple */
2650 if (cstate->rel->rd_att->constr ||
2651 resultRelInfo->ri_PartitionCheck)
2652 ExecConstraints(resultRelInfo, slot, estate);
2654 if (useHeapMultiInsert)
2656 /* Add this tuple to the tuple buffer */
2657 if (nBufferedTuples == 0)
2658 firstBufferedLineNo = cstate->cur_lineno;
2659 bufferedTuples[nBufferedTuples++] = tuple;
2660 bufferedTuplesSize += tuple->t_len;
2663 * If the buffer filled up, flush it. Also flush if the
2664 * total size of all the tuples in the buffer becomes
2665 * large, to avoid using large amounts of memory for the
2666 * buffer when the tuples are exceptionally wide.
2668 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2669 bufferedTuplesSize > 65535)
2671 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2672 resultRelInfo, myslot, bistate,
2673 nBufferedTuples, bufferedTuples,
2674 firstBufferedLineNo);
2675 nBufferedTuples = 0;
2676 bufferedTuplesSize = 0;
2681 List *recheckIndexes = NIL;
2683 /* OK, store the tuple and create index entries for it */
2684 heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2685 hi_options, bistate);
2687 if (resultRelInfo->ri_NumIndices > 0)
2688 recheckIndexes = ExecInsertIndexTuples(slot,
2695 /* AFTER ROW INSERT Triggers */
2696 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2699 list_free(recheckIndexes);
2704 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2705 * this is the same definition used by execMain.c for counting
2706 * tuples inserted by an INSERT command.
2710 if (saved_resultRelInfo)
2712 resultRelInfo = saved_resultRelInfo;
2713 estate->es_result_relation_info = resultRelInfo;
2718 /* Flush any remaining buffered tuples */
2719 if (nBufferedTuples > 0)
2720 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2721 resultRelInfo, myslot, bistate,
2722 nBufferedTuples, bufferedTuples,
2723 firstBufferedLineNo);
2725 /* Done, clean up */
2726 error_context_stack = errcallback.previous;
2728 FreeBulkInsertState(bistate);
2730 MemoryContextSwitchTo(oldcontext);
2733 * In the old protocol, tell pqcomm that we can process normal protocol
2736 if (cstate->copy_dest == COPY_OLD_FE)
2739 /* Execute AFTER STATEMENT insertion triggers */
2740 ExecASInsertTriggers(estate, resultRelInfo);
2742 /* Handle queued AFTER triggers */
2743 AfterTriggerEndQuery(estate);
2748 ExecResetTupleTable(estate->es_tupleTable, false);
2750 ExecCloseIndices(resultRelInfo);
2752 /* Close all the partitioned tables, leaf partitions, and their indices */
2753 if (cstate->partition_dispatch_info)
2758 * Remember cstate->partition_dispatch_info[0] corresponds to the root
2759 * partitioned table, which we must not try to close, because it is
2760 * the main target table of COPY that will be closed eventually by
2761 * DoCopy(). Also, tupslot is NULL for the root partitioned table.
2763 for (i = 1; i < cstate->num_dispatch; i++)
2765 PartitionDispatch pd = cstate->partition_dispatch_info[i];
2767 heap_close(pd->reldesc, NoLock);
2768 ExecDropSingleTupleTableSlot(pd->tupslot);
2770 for (i = 0; i < cstate->num_partitions; i++)
2772 ResultRelInfo *resultRelInfo = cstate->partitions + i;
2774 ExecCloseIndices(resultRelInfo);
2775 heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2778 /* Release the standalone partition tuple descriptor */
2779 ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
2782 FreeExecutorState(estate);
2785 * If we skipped writing WAL, then we need to sync the heap (but not
2786 * indexes since those use WAL anyway)
2788 if (hi_options & HEAP_INSERT_SKIP_WAL)
2789 heap_sync(cstate->rel);
2795 * A subroutine of CopyFrom, to write the current batch of buffered heap
2796 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2800 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2801 int hi_options, ResultRelInfo *resultRelInfo,
2802 TupleTableSlot *myslot, BulkInsertState bistate,
2803 int nBufferedTuples, HeapTuple *bufferedTuples,
2804 int firstBufferedLineNo)
2806 MemoryContext oldcontext;
2808 int save_cur_lineno;
2811 * Print error context information correctly, if one of the operations
2814 cstate->line_buf_valid = false;
2815 save_cur_lineno = cstate->cur_lineno;
2818 * heap_multi_insert leaks memory, so switch to short-lived memory context
2819 * before calling it.
2821 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2822 heap_multi_insert(cstate->rel,
2828 MemoryContextSwitchTo(oldcontext);
2831 * If there are any indexes, update them for all the inserted tuples, and
2832 * run AFTER ROW INSERT triggers.
2834 if (resultRelInfo->ri_NumIndices > 0)
2836 for (i = 0; i < nBufferedTuples; i++)
2838 List *recheckIndexes;
2840 cstate->cur_lineno = firstBufferedLineNo + i;
2841 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2843 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2844 estate, false, NULL, NIL);
2845 ExecARInsertTriggers(estate, resultRelInfo,
2848 list_free(recheckIndexes);
2853 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2856 else if (resultRelInfo->ri_TrigDesc != NULL &&
2857 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
2859 for (i = 0; i < nBufferedTuples; i++)
2861 cstate->cur_lineno = firstBufferedLineNo + i;
2862 ExecARInsertTriggers(estate, resultRelInfo,
2868 /* reset cur_lineno to where we were */
2869 cstate->cur_lineno = save_cur_lineno;
2873 * Setup to read tuples from a file for COPY FROM.
2875 * 'rel': Used as a template for the tuples
2876 * 'filename': Name of server-local file to read
2877 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2878 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2880 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2883 BeginCopyFrom(ParseState *pstate,
2885 const char *filename,
2887 copy_data_source_cb data_source_cb,
2892 bool pipe = (filename == NULL);
2894 Form_pg_attribute *attr;
2895 AttrNumber num_phys_attrs,
2897 FmgrInfo *in_functions;
2902 ExprState **defexprs;
2903 MemoryContext oldcontext;
2904 bool volatile_defexprs;
2906 cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
2907 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2909 /* Initialize state variables */
2910 cstate->fe_eof = false;
2911 cstate->eol_type = EOL_UNKNOWN;
2912 cstate->cur_relname = RelationGetRelationName(cstate->rel);
2913 cstate->cur_lineno = 0;
2914 cstate->cur_attname = NULL;
2915 cstate->cur_attval = NULL;
2917 /* Set up variables to avoid per-attribute overhead. */
2918 initLongStringInfo(&cstate->attribute_buf);
2919 initLongStringInfo(&cstate->line_buf);
2920 cstate->line_buf_converted = false;
2921 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2922 cstate->raw_buf_index = cstate->raw_buf_len = 0;
2924 tupDesc = RelationGetDescr(cstate->rel);
2925 attr = tupDesc->attrs;
2926 num_phys_attrs = tupDesc->natts;
2928 volatile_defexprs = false;
2931 * Pick up the required catalog information for each attribute in the
2932 * relation, including the input function, the element type (to pass to
2933 * the input function), and info about defaults and constraints. (Which
2934 * input function we use depends on text/binary format choice.)
2936 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2937 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2938 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2939 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2941 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2943 /* We don't need info for dropped attributes */
2944 if (attr[attnum - 1]->attisdropped)
2947 /* Fetch the input function and typioparam info */
2949 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2950 &in_func_oid, &typioparams[attnum - 1]);
2952 getTypeInputInfo(attr[attnum - 1]->atttypid,
2953 &in_func_oid, &typioparams[attnum - 1]);
2954 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2956 /* Get default info if needed */
2957 if (!list_member_int(cstate->attnumlist, attnum))
2959 /* attribute is NOT to be copied from input */
2960 /* use default value if one exists */
2961 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
2964 if (defexpr != NULL)
2966 /* Run the expression through planner */
2967 defexpr = expression_planner(defexpr);
2969 /* Initialize executable expression in copycontext */
2970 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
2971 defmap[num_defaults] = attnum - 1;
2975 * If a default expression looks at the table being loaded,
2976 * then it could give the wrong answer when using
2977 * multi-insert. Since database access can be dynamic this is
2978 * hard to test for exactly, so we use the much wider test of
2979 * whether the default expression is volatile. We allow for
2980 * the special case of when the default expression is the
2981 * nextval() of a sequence which in this specific case is
2982 * known to be safe for use with the multi-insert
2983 * optimization. Hence we use this special case function
2984 * checker rather than the standard check for
2985 * contain_volatile_functions().
2987 if (!volatile_defexprs)
2988 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
2993 /* We keep those variables in cstate. */
2994 cstate->in_functions = in_functions;
2995 cstate->typioparams = typioparams;
2996 cstate->defmap = defmap;
2997 cstate->defexprs = defexprs;
2998 cstate->volatile_defexprs = volatile_defexprs;
2999 cstate->num_defaults = num_defaults;
3000 cstate->is_program = is_program;
3004 cstate->copy_dest = COPY_CALLBACK;
3005 cstate->data_source_cb = data_source_cb;
3009 Assert(!is_program); /* the grammar does not allow this */
3010 if (whereToSendOutput == DestRemote)
3011 ReceiveCopyBegin(cstate);
3013 cstate->copy_file = stdin;
3017 cstate->filename = pstrdup(filename);
3019 if (cstate->is_program)
3021 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3022 if (cstate->copy_file == NULL)
3024 (errcode_for_file_access(),
3025 errmsg("could not execute command \"%s\": %m",
3026 cstate->filename)));
3032 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3033 if (cstate->copy_file == NULL)
3035 /* copy errno because ereport subfunctions might change it */
3036 int save_errno = errno;
3039 (errcode_for_file_access(),
3040 errmsg("could not open file \"%s\" for reading: %m",
3042 (save_errno == ENOENT || save_errno == EACCES) ?
3043 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3044 "You may want a client-side facility such as psql's \\copy.") : 0));
3047 if (fstat(fileno(cstate->copy_file), &st))
3049 (errcode_for_file_access(),
3050 errmsg("could not stat file \"%s\": %m",
3051 cstate->filename)));
3053 if (S_ISDIR(st.st_mode))
3055 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3056 errmsg("\"%s\" is a directory", cstate->filename)));
3060 if (!cstate->binary)
3062 /* must rely on user to tell us... */
3063 cstate->file_has_oids = cstate->oids;
3067 /* Read and verify binary header */
3072 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3073 memcmp(readSig, BinarySignature, 11) != 0)
3075 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3076 errmsg("COPY file signature not recognized")));
3078 if (!CopyGetInt32(cstate, &tmp))
3080 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3081 errmsg("invalid COPY file header (missing flags)")));
3082 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3084 if ((tmp >> 16) != 0)
3086 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3087 errmsg("unrecognized critical flags in COPY file header")));
3088 /* Header extension length */
3089 if (!CopyGetInt32(cstate, &tmp) ||
3092 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3093 errmsg("invalid COPY file header (missing length)")));
3094 /* Skip extension header, if present */
3097 if (CopyGetData(cstate, readSig, 1, 1) != 1)
3099 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3100 errmsg("invalid COPY file header (wrong length)")));
3104 if (cstate->file_has_oids && cstate->binary)
3106 getTypeBinaryInputInfo(OIDOID,
3107 &in_func_oid, &cstate->oid_typioparam);
3108 fmgr_info(in_func_oid, &cstate->oid_in_function);
3111 /* create workspace for CopyReadAttributes results */
3112 if (!cstate->binary)
3114 AttrNumber attr_count = list_length(cstate->attnumlist);
3115 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3117 cstate->max_fields = nfields;
3118 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3121 MemoryContextSwitchTo(oldcontext);
3127 * Read raw fields in the next line for COPY FROM in text or csv mode.
3128 * Return false if no more lines.
3130 * An internal temporary buffer is returned via 'fields'. It is valid until
3131 * the next call of the function. Since the function returns all raw fields
3132 * in the input file, 'nfields' could be different from the number of columns
3135 * NOTE: force_not_null option are not applied to the returned fields.
3138 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3143 /* only available for text or csv input */
3144 Assert(!cstate->binary);
3146 /* on input just throw the header line away */
3147 if (cstate->cur_lineno == 0 && cstate->header_line)
3149 cstate->cur_lineno++;
3150 if (CopyReadLine(cstate))
3151 return false; /* done */
3154 cstate->cur_lineno++;
3156 /* Actually read the line into memory here */
3157 done = CopyReadLine(cstate);
3160 * EOF at start of line means we're done. If we see EOF after some
3161 * characters, we act as though it was newline followed by EOF, ie,
3162 * process the line and then exit loop on next iteration.
3164 if (done && cstate->line_buf.len == 0)
3167 /* Parse the line into de-escaped field values */
3168 if (cstate->csv_mode)
3169 fldct = CopyReadAttributesCSV(cstate);
3171 fldct = CopyReadAttributesText(cstate);
3173 *fields = cstate->raw_fields;
3179 * Read next tuple from file for COPY FROM. Return false if no more tuples.
3181 * 'econtext' is used to evaluate default expression for each columns not
3182 * read from the file. It can be NULL when no default values are used, i.e.
3183 * when all columns are read from the file.
3185 * 'values' and 'nulls' arrays must be the same length as columns of the
3186 * relation passed to BeginCopyFrom. This function fills the arrays.
3187 * Oid of the tuple is returned with 'tupleOid' separately.
3190 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3191 Datum *values, bool *nulls, Oid *tupleOid)
3194 Form_pg_attribute *attr;
3195 AttrNumber num_phys_attrs,
3197 num_defaults = cstate->num_defaults;
3198 FmgrInfo *in_functions = cstate->in_functions;
3199 Oid *typioparams = cstate->typioparams;
3203 bool file_has_oids = cstate->file_has_oids;
3204 int *defmap = cstate->defmap;
3205 ExprState **defexprs = cstate->defexprs;
3207 tupDesc = RelationGetDescr(cstate->rel);
3208 attr = tupDesc->attrs;
3209 num_phys_attrs = tupDesc->natts;
3210 attr_count = list_length(cstate->attnumlist);
3211 nfields = file_has_oids ? (attr_count + 1) : attr_count;
3213 /* Initialize all values for row to NULL */
3214 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3215 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3217 if (!cstate->binary)
3219 char **field_strings;
3225 /* read raw fields in the next line */
3226 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3229 /* check for overflowing fields */
3230 if (nfields > 0 && fldct > nfields)
3232 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3233 errmsg("extra data after last expected column")));
3237 /* Read the OID field if present */
3240 if (fieldno >= fldct)
3242 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3243 errmsg("missing data for OID column")));
3244 string = field_strings[fieldno++];
3248 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3249 errmsg("null OID in COPY data")));
3250 else if (cstate->oids && tupleOid != NULL)
3252 cstate->cur_attname = "oid";
3253 cstate->cur_attval = string;
3254 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3255 CStringGetDatum(string)));
3256 if (*tupleOid == InvalidOid)
3258 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3259 errmsg("invalid OID in COPY data")));
3260 cstate->cur_attname = NULL;
3261 cstate->cur_attval = NULL;
3265 /* Loop to read the user attributes on the line. */
3266 foreach(cur, cstate->attnumlist)
3268 int attnum = lfirst_int(cur);
3271 if (fieldno >= fldct)
3273 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3274 errmsg("missing data for column \"%s\"",
3275 NameStr(attr[m]->attname))));
3276 string = field_strings[fieldno++];
3278 if (cstate->convert_select_flags &&
3279 !cstate->convert_select_flags[m])
3281 /* ignore input field, leaving column as NULL */
3285 if (cstate->csv_mode)
3287 if (string == NULL &&
3288 cstate->force_notnull_flags[m])
3291 * FORCE_NOT_NULL option is set and column is NULL -
3292 * convert it to the NULL string.
3294 string = cstate->null_print;
3296 else if (string != NULL && cstate->force_null_flags[m]
3297 && strcmp(string, cstate->null_print) == 0)
3300 * FORCE_NULL option is set and column matches the NULL
3301 * string. It must have been quoted, or otherwise the
3302 * string would already have been set to NULL. Convert it
3303 * to NULL as specified.
3309 cstate->cur_attname = NameStr(attr[m]->attname);
3310 cstate->cur_attval = string;
3311 values[m] = InputFunctionCall(&in_functions[m],
3314 attr[m]->atttypmod);
3317 cstate->cur_attname = NULL;
3318 cstate->cur_attval = NULL;
3321 Assert(fieldno == nfields);
3329 cstate->cur_lineno++;
3331 if (!CopyGetInt16(cstate, &fld_count))
3333 /* EOF detected (end of file, or protocol-level EOF) */
3337 if (fld_count == -1)
3340 * Received EOF marker. In a V3-protocol copy, wait for the
3341 * protocol-level EOF, and complain if it doesn't come
3342 * immediately. This ensures that we correctly handle CopyFail,
3343 * if client chooses to send that now.
3345 * Note that we MUST NOT try to read more data in an old-protocol
3346 * copy, since there is no protocol-level EOF marker then. We
3347 * could go either way for copy from file, but choose to throw
3348 * error if there's data after the EOF marker, for consistency
3349 * with the new-protocol case.
3353 if (cstate->copy_dest != COPY_OLD_FE &&
3354 CopyGetData(cstate, &dummy, 1, 1) > 0)
3356 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3357 errmsg("received copy data after EOF marker")));
3361 if (fld_count != attr_count)
3363 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3364 errmsg("row field count is %d, expected %d",
3365 (int) fld_count, attr_count)));
3371 cstate->cur_attname = "oid";
3373 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3375 &cstate->oid_in_function,
3376 cstate->oid_typioparam,
3379 if (isnull || loaded_oid == InvalidOid)
3381 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3382 errmsg("invalid OID in COPY data")));
3383 cstate->cur_attname = NULL;
3384 if (cstate->oids && tupleOid != NULL)
3385 *tupleOid = loaded_oid;
3389 foreach(cur, cstate->attnumlist)
3391 int attnum = lfirst_int(cur);
3394 cstate->cur_attname = NameStr(attr[m]->attname);
3396 values[m] = CopyReadBinaryAttribute(cstate,
3402 cstate->cur_attname = NULL;
3407 * Now compute and insert any defaults available for the columns not
3408 * provided by the input data. Anything not processed here or above will
3411 for (i = 0; i < num_defaults; i++)
3414 * The caller must supply econtext and have switched into the
3415 * per-tuple memory context in it.
3417 Assert(econtext != NULL);
3418 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3420 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3428 * Clean up storage and release resources for COPY FROM.
3431 EndCopyFrom(CopyState cstate)
3433 /* No COPY FROM related resources except memory. */
3439 * Read the next input line and stash it in line_buf, with conversion to
3442 * Result is true if read was terminated by EOF, false if terminated
3443 * by newline. The terminating newline or EOF marker is not included
3444 * in the final value of line_buf.
3447 CopyReadLine(CopyState cstate)
3451 resetStringInfo(&cstate->line_buf);
3452 cstate->line_buf_valid = true;
3454 /* Mark that encoding conversion hasn't occurred yet */
3455 cstate->line_buf_converted = false;
3457 /* Parse data and transfer into line_buf */
3458 result = CopyReadLineText(cstate);
3463 * Reached EOF. In protocol version 3, we should ignore anything
3464 * after \. up to the protocol end of copy data. (XXX maybe better
3465 * not to treat \. as special?)
3467 if (cstate->copy_dest == COPY_NEW_FE)
3471 cstate->raw_buf_index = cstate->raw_buf_len;
3472 } while (CopyLoadRawBuf(cstate));
3478 * If we didn't hit EOF, then we must have transferred the EOL marker
3479 * to line_buf along with the data. Get rid of it.
3481 switch (cstate->eol_type)
3484 Assert(cstate->line_buf.len >= 1);
3485 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3486 cstate->line_buf.len--;
3487 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3490 Assert(cstate->line_buf.len >= 1);
3491 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3492 cstate->line_buf.len--;
3493 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3496 Assert(cstate->line_buf.len >= 2);
3497 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3498 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3499 cstate->line_buf.len -= 2;
3500 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3503 /* shouldn't get here */
3509 /* Done reading the line. Convert it to server encoding. */
3510 if (cstate->need_transcoding)
3514 cvt = pg_any_to_server(cstate->line_buf.data,
3515 cstate->line_buf.len,
3516 cstate->file_encoding);
3517 if (cvt != cstate->line_buf.data)
3519 /* transfer converted data back to line_buf */
3520 resetStringInfo(&cstate->line_buf);
3521 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3526 /* Now it's safe to use the buffer in error messages */
3527 cstate->line_buf_converted = true;
3533 * CopyReadLineText - inner loop of CopyReadLine for text mode
3536 CopyReadLineText(CopyState cstate)
3541 bool need_data = false;
3542 bool hit_eof = false;
3543 bool result = false;
3547 bool first_char_in_line = true;
3548 bool in_quote = false,
3549 last_was_esc = false;
3551 char escapec = '\0';
3553 if (cstate->csv_mode)
3555 quotec = cstate->quote[0];
3556 escapec = cstate->escape[0];
3557 /* ignore special escape processing if it's the same as quotec */
3558 if (quotec == escapec)
3562 mblen_str[1] = '\0';
3565 * The objective of this loop is to transfer the entire next input line
3566 * into line_buf. Hence, we only care for detecting newlines (\r and/or
3567 * \n) and the end-of-copy marker (\.).
3569 * In CSV mode, \r and \n inside a quoted field are just part of the data
3570 * value and are put in line_buf. We keep just enough state to know if we
3571 * are currently in a quoted field or not.
3573 * These four characters, and the CSV escape and quote characters, are
3574 * assumed the same in frontend and backend encodings.
3576 * For speed, we try to move data from raw_buf to line_buf in chunks
3577 * rather than one character at a time. raw_buf_ptr points to the next
3578 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3579 * have been determined to be part of the line, but not yet transferred to
3582 * For a little extra speed within the loop, we copy raw_buf and
3583 * raw_buf_len into local variables.
3585 copy_raw_buf = cstate->raw_buf;
3586 raw_buf_ptr = cstate->raw_buf_index;
3587 copy_buf_len = cstate->raw_buf_len;
3595 * Load more data if needed. Ideally we would just force four bytes
3596 * of read-ahead and avoid the many calls to
3597 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3598 * does not allow us to read too far ahead or we might read into the
3599 * next data, so we read-ahead only as far we know we can. One
3600 * optimization would be to read-ahead four byte here if
3601 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3602 * considering the size of the buffer.
3604 if (raw_buf_ptr >= copy_buf_len || need_data)
3609 * Try to read some more data. This will certainly reset
3610 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3612 if (!CopyLoadRawBuf(cstate))
3615 copy_buf_len = cstate->raw_buf_len;
3618 * If we are completely out of data, break out of the loop,
3621 if (copy_buf_len <= 0)
3629 /* OK to fetch a character */
3630 prev_raw_ptr = raw_buf_ptr;
3631 c = copy_raw_buf[raw_buf_ptr++];
3633 if (cstate->csv_mode)
3636 * If character is '\\' or '\r', we may need to look ahead below.
3637 * Force fetch of the next character if we don't already have it.
3638 * We need to do this before changing CSV state, in case one of
3639 * these characters is also the quote or escape character.
3641 * Note: old-protocol does not like forced prefetch, but it's OK
3642 * here since we cannot validly be at EOF.
3644 if (c == '\\' || c == '\r')
3646 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3650 * Dealing with quotes and escapes here is mildly tricky. If the
3651 * quote char is also the escape char, there's no problem - we
3652 * just use the char as a toggle. If they are different, we need
3653 * to ensure that we only take account of an escape inside a
3654 * quoted field and immediately preceding a quote char, and not
3655 * the second in an escape-escape sequence.
3657 if (in_quote && c == escapec)
3658 last_was_esc = !last_was_esc;
3659 if (c == quotec && !last_was_esc)
3660 in_quote = !in_quote;
3662 last_was_esc = false;
3665 * Updating the line count for embedded CR and/or LF chars is
3666 * necessarily a little fragile - this test is probably about the
3667 * best we can do. (XXX it's arguable whether we should do this
3668 * at all --- is cur_lineno a physical or logical count?)
3670 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3671 cstate->cur_lineno++;
3675 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3677 /* Check for \r\n on first line, _and_ handle \r\n. */
3678 if (cstate->eol_type == EOL_UNKNOWN ||
3679 cstate->eol_type == EOL_CRNL)
3682 * If need more data, go back to loop top to load it.
3684 * Note that if we are at EOF, c will wind up as '\0' because
3685 * of the guaranteed pad of raw_buf.
3687 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3690 c = copy_raw_buf[raw_buf_ptr];
3694 raw_buf_ptr++; /* eat newline */
3695 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3699 /* found \r, but no \n */
3700 if (cstate->eol_type == EOL_CRNL)
3702 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3704 errmsg("literal carriage return found in data") :
3705 errmsg("unquoted carriage return found in data"),
3707 errhint("Use \"\\r\" to represent carriage return.") :
3708 errhint("Use quoted CSV field to represent carriage return.")));
3711 * if we got here, it is the first line and we didn't find
3712 * \n, so don't consume the peeked character
3714 cstate->eol_type = EOL_CR;
3717 else if (cstate->eol_type == EOL_NL)
3719 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3721 errmsg("literal carriage return found in data") :
3722 errmsg("unquoted carriage return found in data"),
3724 errhint("Use \"\\r\" to represent carriage return.") :
3725 errhint("Use quoted CSV field to represent carriage return.")));
3726 /* If reach here, we have found the line terminator */
3731 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3733 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3735 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3737 errmsg("literal newline found in data") :
3738 errmsg("unquoted newline found in data"),
3740 errhint("Use \"\\n\" to represent newline.") :
3741 errhint("Use quoted CSV field to represent newline.")));
3742 cstate->eol_type = EOL_NL; /* in case not set yet */
3743 /* If reach here, we have found the line terminator */
3748 * In CSV mode, we only recognize \. alone on a line. This is because
3749 * \. is a valid CSV data value.
3751 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3755 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3756 IF_NEED_REFILL_AND_EOF_BREAK(0);
3759 * get next character
3760 * Note: we do not change c so if it isn't \., we can fall
3761 * through and continue processing for file encoding.
3764 c2 = copy_raw_buf[raw_buf_ptr];
3768 raw_buf_ptr++; /* consume the '.' */
3771 * Note: if we loop back for more data here, it does not
3772 * matter that the CSV state change checks are re-executed; we
3773 * will come back here with no important state changed.
3775 if (cstate->eol_type == EOL_CRNL)
3777 /* Get the next character */
3778 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3779 /* if hit_eof, c2 will become '\0' */
3780 c2 = copy_raw_buf[raw_buf_ptr++];
3784 if (!cstate->csv_mode)
3786 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3787 errmsg("end-of-copy marker does not match previous newline style")));
3789 NO_END_OF_COPY_GOTO;
3791 else if (c2 != '\r')
3793 if (!cstate->csv_mode)
3795 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3796 errmsg("end-of-copy marker corrupt")));
3798 NO_END_OF_COPY_GOTO;
3802 /* Get the next character */
3803 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3804 /* if hit_eof, c2 will become '\0' */
3805 c2 = copy_raw_buf[raw_buf_ptr++];
3807 if (c2 != '\r' && c2 != '\n')
3809 if (!cstate->csv_mode)
3811 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3812 errmsg("end-of-copy marker corrupt")));
3814 NO_END_OF_COPY_GOTO;
3817 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3818 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3819 (cstate->eol_type == EOL_CR && c2 != '\r'))
3822 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3823 errmsg("end-of-copy marker does not match previous newline style")));
3827 * Transfer only the data before the \. into line_buf, then
3828 * discard the data and the \. sequence.
3830 if (prev_raw_ptr > cstate->raw_buf_index)
3831 appendBinaryStringInfo(&cstate->line_buf,
3832 cstate->raw_buf + cstate->raw_buf_index,
3833 prev_raw_ptr - cstate->raw_buf_index);
3834 cstate->raw_buf_index = raw_buf_ptr;
3835 result = true; /* report EOF */
3838 else if (!cstate->csv_mode)
3841 * If we are here, it means we found a backslash followed by
3842 * something other than a period. In non-CSV mode, anything
3843 * after a backslash is special, so we skip over that second
3844 * character too. If we didn't do that \\. would be
3845 * considered an eof-of copy, while in non-CSV mode it is a
3846 * literal backslash followed by a period. In CSV mode,
3847 * backslashes are not special, so we want to process the
3848 * character after the backslash just like a normal character,
3849 * so we don't increment in those cases.
3855 * This label is for CSV cases where \. appears at the start of a
3856 * line, but there is more text after it, meaning it was a data value.
3857 * We are more strict for \. in CSV mode because \. could be a data
3858 * value, while in non-CSV mode, \. cannot be a data value.
3863 * Process all bytes of a multi-byte character as a group.
3865 * We only support multi-byte sequences where the first byte has the
3866 * high-bit set, so as an optimization we can avoid this block
3867 * entirely if it is not set.
3869 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3874 /* All our encodings only read the first byte to get the length */
3875 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3876 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3877 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3878 raw_buf_ptr += mblen - 1;
3880 first_char_in_line = false;
3881 } /* end of outer loop */
3884 * Transfer any still-uncopied data to line_buf.
3892 * Return decimal value for a hexadecimal digit
3895 GetDecimalFromHex(char hex)
3897 if (isdigit((unsigned char) hex))
3900 return tolower((unsigned char) hex) - 'a' + 10;
3904 * Parse the current line into separate attributes (fields),
3905 * performing de-escaping as needed.
3907 * The input is in line_buf. We use attribute_buf to hold the result
3908 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3909 * string, or NULL when the input matches the null marker string.
3910 * This array is expanded as necessary.
3912 * (Note that the caller cannot check for nulls since the returned
3913 * string would be the post-de-escaping equivalent, which may look
3914 * the same as some valid data string.)
3916 * delim is the column delimiter string (must be just one byte for now).
3917 * null_print is the null marker string. Note that this is compared to
3918 * the pre-de-escaped input string.
3920 * The return value is the number of fields actually read.
3923 CopyReadAttributesText(CopyState cstate)
3925 char delimc = cstate->delim[0];
3932 * We need a special case for zero-column tables: check that the input
3933 * line is empty, and return.
3935 if (cstate->max_fields <= 0)
3937 if (cstate->line_buf.len != 0)
3939 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3940 errmsg("extra data after last expected column")));
3944 resetStringInfo(&cstate->attribute_buf);
3947 * The de-escaped attributes will certainly not be longer than the input
3948 * data line, so we can just force attribute_buf to be large enough and
3949 * then transfer data without any checks for enough space. We need to do
3950 * it this way because enlarging attribute_buf mid-stream would invalidate
3951 * pointers already stored into cstate->raw_fields[].
3953 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3954 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3955 output_ptr = cstate->attribute_buf.data;
3957 /* set pointer variables for loop */
3958 cur_ptr = cstate->line_buf.data;
3959 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3961 /* Outer loop iterates over fields */
3965 bool found_delim = false;
3969 bool saw_non_ascii = false;
3971 /* Make sure there is enough space for the next value */
3972 if (fieldno >= cstate->max_fields)
3974 cstate->max_fields *= 2;
3975 cstate->raw_fields =
3976 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3979 /* Remember start of field on both input and output sides */
3980 start_ptr = cur_ptr;
3981 cstate->raw_fields[fieldno] = output_ptr;
3984 * Scan data for field.
3986 * Note that in this loop, we are scanning to locate the end of field
3987 * and also speculatively performing de-escaping. Once we find the
3988 * end-of-field, we can match the raw field contents against the null
3989 * marker string. Only after that comparison fails do we know that
3990 * de-escaping is actually the right thing to do; therefore we *must
3991 * not* throw any syntax errors before we've done the null-marker
3999 if (cur_ptr >= line_end_ptr)
4009 if (cur_ptr >= line_end_ptr)
4027 if (cur_ptr < line_end_ptr)
4033 val = (val << 3) + OCTVALUE(c);
4034 if (cur_ptr < line_end_ptr)
4040 val = (val << 3) + OCTVALUE(c);
4046 if (c == '\0' || IS_HIGHBIT_SET(c))
4047 saw_non_ascii = true;
4052 if (cur_ptr < line_end_ptr)
4054 char hexchar = *cur_ptr;
4056 if (isxdigit((unsigned char) hexchar))
4058 int val = GetDecimalFromHex(hexchar);
4061 if (cur_ptr < line_end_ptr)
4064 if (isxdigit((unsigned char) hexchar))
4067 val = (val << 4) + GetDecimalFromHex(hexchar);
4071 if (c == '\0' || IS_HIGHBIT_SET(c))
4072 saw_non_ascii = true;
4096 * in all other cases, take the char after '\'
4102 /* Add c to output string */
4106 /* Check whether raw input matched null marker */
4107 input_len = end_ptr - start_ptr;
4108 if (input_len == cstate->null_print_len &&
4109 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4110 cstate->raw_fields[fieldno] = NULL;
4114 * At this point we know the field is supposed to contain data.
4116 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4117 * resulting string is valid data for the db encoding.
4121 char *fld = cstate->raw_fields[fieldno];
4123 pg_verifymbstr(fld, output_ptr - fld, false);
4127 /* Terminate attribute value in output area */
4128 *output_ptr++ = '\0';
4131 /* Done if we hit EOL instead of a delim */
4136 /* Clean up state of attribute_buf */
4138 Assert(*output_ptr == '\0');
4139 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4145 * Parse the current line into separate attributes (fields),
4146 * performing de-escaping as needed. This has exactly the same API as
4147 * CopyReadAttributesText, except we parse the fields according to
4148 * "standard" (i.e. common) CSV usage.
4151 CopyReadAttributesCSV(CopyState cstate)
4153 char delimc = cstate->delim[0];
4154 char quotec = cstate->quote[0];
4155 char escapec = cstate->escape[0];
4162 * We need a special case for zero-column tables: check that the input
4163 * line is empty, and return.
4165 if (cstate->max_fields <= 0)
4167 if (cstate->line_buf.len != 0)
4169 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4170 errmsg("extra data after last expected column")));
4174 resetStringInfo(&cstate->attribute_buf);
4177 * The de-escaped attributes will certainly not be longer than the input
4178 * data line, so we can just force attribute_buf to be large enough and
4179 * then transfer data without any checks for enough space. We need to do
4180 * it this way because enlarging attribute_buf mid-stream would invalidate
4181 * pointers already stored into cstate->raw_fields[].
4183 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4184 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4185 output_ptr = cstate->attribute_buf.data;
4187 /* set pointer variables for loop */
4188 cur_ptr = cstate->line_buf.data;
4189 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4191 /* Outer loop iterates over fields */
4195 bool found_delim = false;
4196 bool saw_quote = false;
4201 /* Make sure there is enough space for the next value */
4202 if (fieldno >= cstate->max_fields)
4204 cstate->max_fields *= 2;
4205 cstate->raw_fields =
4206 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4209 /* Remember start of field on both input and output sides */
4210 start_ptr = cur_ptr;
4211 cstate->raw_fields[fieldno] = output_ptr;
4214 * Scan data for field,
4216 * The loop starts in "not quote" mode and then toggles between that
4217 * and "in quote" mode. The loop exits normally if it is in "not
4218 * quote" mode and a delimiter or line end is seen.
4228 if (cur_ptr >= line_end_ptr)
4231 /* unquoted field delimiter */
4237 /* start of quoted field (or part of field) */
4243 /* Add c to output string */
4251 if (cur_ptr >= line_end_ptr)
4253 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4254 errmsg("unterminated CSV quoted field")));
4258 /* escape within a quoted field */
4262 * peek at the next char if available, and escape it if it
4263 * is an escape char or a quote char
4265 if (cur_ptr < line_end_ptr)
4267 char nextc = *cur_ptr;
4269 if (nextc == escapec || nextc == quotec)
4271 *output_ptr++ = nextc;
4279 * end of quoted field. Must do this test after testing for
4280 * escape in case quote char and escape char are the same
4281 * (which is the common case).
4286 /* Add c to output string */
4292 /* Terminate attribute value in output area */
4293 *output_ptr++ = '\0';
4295 /* Check whether raw input matched null marker */
4296 input_len = end_ptr - start_ptr;
4297 if (!saw_quote && input_len == cstate->null_print_len &&
4298 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4299 cstate->raw_fields[fieldno] = NULL;
4302 /* Done if we hit EOL instead of a delim */
4307 /* Clean up state of attribute_buf */
4309 Assert(*output_ptr == '\0');
4310 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4317 * Read a binary attribute
4320 CopyReadBinaryAttribute(CopyState cstate,
4321 int column_no, FmgrInfo *flinfo,
4322 Oid typioparam, int32 typmod,
4328 if (!CopyGetInt32(cstate, &fld_size))
4330 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4331 errmsg("unexpected EOF in COPY data")));
4335 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4339 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4340 errmsg("invalid field size")));
4342 /* reset attribute_buf to empty, and load raw data in it */
4343 resetStringInfo(&cstate->attribute_buf);
4345 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4346 if (CopyGetData(cstate, cstate->attribute_buf.data,
4347 fld_size, fld_size) != fld_size)
4349 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4350 errmsg("unexpected EOF in COPY data")));
4352 cstate->attribute_buf.len = fld_size;
4353 cstate->attribute_buf.data[fld_size] = '\0';
4355 /* Call the column type's binary input converter */
4356 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4357 typioparam, typmod);
4359 /* Trouble if it didn't eat the whole buffer */
4360 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4362 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4363 errmsg("incorrect binary data format")));
4370 * Send text representation of one attribute, with conversion and escaping
4372 #define DUMPSOFAR() \
4375 CopySendData(cstate, start, ptr - start); \
4379 CopyAttributeOutText(CopyState cstate, char *string)
4384 char delimc = cstate->delim[0];
4386 if (cstate->need_transcoding)
4387 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4392 * We have to grovel through the string searching for control characters
4393 * and instances of the delimiter character. In most cases, though, these
4394 * are infrequent. To avoid overhead from calling CopySendData once per
4395 * character, we dump out all characters between escaped characters in a
4396 * single call. The loop invariant is that the data from "start" to "ptr"
4397 * can be sent literally, but hasn't yet been.
4399 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4400 * in valid backend encodings, extra bytes of a multibyte character never
4401 * look like ASCII. This loop is sufficiently performance-critical that
4402 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4403 * of the normal safe-encoding path.
4405 if (cstate->encoding_embeds_ascii)
4408 while ((c = *ptr) != '\0')
4410 if ((unsigned char) c < (unsigned char) 0x20)
4413 * \r and \n must be escaped, the others are traditional. We
4414 * prefer to dump these using the C-like notation, rather than
4415 * a backslash and the literal character, because it makes the
4416 * dump file a bit more proof against Microsoftish data
4440 /* If it's the delimiter, must backslash it */
4443 /* All ASCII control chars are length 1 */
4445 continue; /* fall to end of loop */
4447 /* if we get here, we need to convert the control char */
4449 CopySendChar(cstate, '\\');
4450 CopySendChar(cstate, c);
4451 start = ++ptr; /* do not include char in next run */
4453 else if (c == '\\' || c == delimc)
4456 CopySendChar(cstate, '\\');
4457 start = ptr++; /* we include char in next run */
4459 else if (IS_HIGHBIT_SET(c))
4460 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4468 while ((c = *ptr) != '\0')
4470 if ((unsigned char) c < (unsigned char) 0x20)
4473 * \r and \n must be escaped, the others are traditional. We
4474 * prefer to dump these using the C-like notation, rather than
4475 * a backslash and the literal character, because it makes the
4476 * dump file a bit more proof against Microsoftish data
4500 /* If it's the delimiter, must backslash it */
4503 /* All ASCII control chars are length 1 */
4505 continue; /* fall to end of loop */
4507 /* if we get here, we need to convert the control char */
4509 CopySendChar(cstate, '\\');
4510 CopySendChar(cstate, c);
4511 start = ++ptr; /* do not include char in next run */
4513 else if (c == '\\' || c == delimc)
4516 CopySendChar(cstate, '\\');
4517 start = ptr++; /* we include char in next run */
4528 * Send text representation of one attribute, with conversion and
4529 * CSV-style escaping
4532 CopyAttributeOutCSV(CopyState cstate, char *string,
4533 bool use_quote, bool single_attr)
4538 char delimc = cstate->delim[0];
4539 char quotec = cstate->quote[0];
4540 char escapec = cstate->escape[0];
4542 /* force quoting if it matches null_print (before conversion!) */
4543 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4546 if (cstate->need_transcoding)
4547 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4552 * Make a preliminary pass to discover if it needs quoting
4557 * Because '\.' can be a data value, quote it if it appears alone on a
4558 * line so it is not interpreted as the end-of-data marker.
4560 if (single_attr && strcmp(ptr, "\\.") == 0)
4566 while ((c = *tptr) != '\0')
4568 if (c == delimc || c == quotec || c == '\n' || c == '\r')
4573 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4574 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4583 CopySendChar(cstate, quotec);
4586 * We adopt the same optimization strategy as in CopyAttributeOutText
4589 while ((c = *ptr) != '\0')
4591 if (c == quotec || c == escapec)
4594 CopySendChar(cstate, escapec);
4595 start = ptr; /* we include char in next run */
4597 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4598 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4604 CopySendChar(cstate, quotec);
4608 /* If it doesn't need quoting, we can just dump it as-is */
4609 CopySendString(cstate, ptr);
4614 * CopyGetAttnums - build an integer list of attnums to be copied
4616 * The input attnamelist is either the user-specified column list,
4617 * or NIL if there was none (in which case we want all the non-dropped
4620 * rel can be NULL ... it's only used for error reports.
4623 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4625 List *attnums = NIL;
4627 if (attnamelist == NIL)
4629 /* Generate default column list */
4630 Form_pg_attribute *attr = tupDesc->attrs;
4631 int attr_count = tupDesc->natts;
4634 for (i = 0; i < attr_count; i++)
4636 if (attr[i]->attisdropped)
4638 attnums = lappend_int(attnums, i + 1);
4643 /* Validate the user-supplied list and extract attnums */
4646 foreach(l, attnamelist)
4648 char *name = strVal(lfirst(l));
4652 /* Lookup column name */
4653 attnum = InvalidAttrNumber;
4654 for (i = 0; i < tupDesc->natts; i++)
4656 if (tupDesc->attrs[i]->attisdropped)
4658 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4660 attnum = tupDesc->attrs[i]->attnum;
4664 if (attnum == InvalidAttrNumber)
4668 (errcode(ERRCODE_UNDEFINED_COLUMN),
4669 errmsg("column \"%s\" of relation \"%s\" does not exist",
4670 name, RelationGetRelationName(rel))));
4673 (errcode(ERRCODE_UNDEFINED_COLUMN),
4674 errmsg("column \"%s\" does not exist",
4677 /* Check for duplicates */
4678 if (list_member_int(attnums, attnum))
4680 (errcode(ERRCODE_DUPLICATE_COLUMN),
4681 errmsg("column \"%s\" specified more than once",
4683 attnums = lappend_int(attnums, attnum);
4692 * copy_dest_startup --- executor startup
4695 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4701 * copy_dest_receive --- receive one tuple
4704 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4706 DR_copy *myState = (DR_copy *) self;
4707 CopyState cstate = myState->cstate;
4709 /* Make sure the tuple is fully deconstructed */
4710 slot_getallattrs(slot);
4712 /* And send the data */
4713 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4714 myState->processed++;
4720 * copy_dest_shutdown --- executor end
4723 copy_dest_shutdown(DestReceiver *self)
4729 * copy_dest_destroy --- release DestReceiver object
4732 copy_dest_destroy(DestReceiver *self)
4738 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4741 CreateCopyDestReceiver(void)
4743 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4745 self->pub.receiveSlot = copy_dest_receive;
4746 self->pub.rStartup = copy_dest_startup;
4747 self->pub.rShutdown = copy_dest_shutdown;
4748 self->pub.rDestroy = copy_dest_destroy;
4749 self->pub.mydest = DestCopyOut;
4751 self->cstate = NULL; /* will be set later */
4752 self->processed = 0;
4754 return (DestReceiver *) self;