1 /*-------------------------------------------------------------------------
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/commands/copy.c
13 *-------------------------------------------------------------------------
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "parser/parse_relation.h"
41 #include "rewrite/rewriteHandler.h"
42 #include "storage/fd.h"
43 #include "tcop/tcopprot.h"
44 #include "utils/builtins.h"
45 #include "utils/lsyscache.h"
46 #include "utils/memutils.h"
47 #include "utils/portal.h"
48 #include "utils/rel.h"
49 #include "utils/rls.h"
50 #include "utils/snapmgr.h"
53 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
54 #define OCTVALUE(c) ((c) - '0')
57 * Represents the different source/dest cases we need to worry about at
62 COPY_FILE, /* to/from file (or a piped program) */
63 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
64 COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
65 COPY_CALLBACK /* to/from callback function */
69 * Represents the end-of-line terminator type of the input
80 * This struct contains all the state variables used throughout a COPY
81 * operation. For simplicity, we use the same struct for all variants of COPY,
82 * even though some fields are used in only some cases.
84 * Multi-byte encodings: all supported client-side encodings encode multi-byte
85 * characters by having the first byte's high bit set. Subsequent bytes of the
86 * character can have the high bit not set. When scanning data in such an
87 * encoding to look for a match to a single-byte (ie ASCII) character, we must
88 * use the full pg_encoding_mblen() machinery to skip over multibyte
89 * characters, else we might find a false match to a trailing byte. In
90 * supported server encodings, there is no possibility of a false match, and
91 * it's faster to make useless comparisons to trailing bytes than it is to
92 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
93 * when we have to do it the hard way.
95 typedef struct CopyStateData
97 /* low-level state data */
98 CopyDest copy_dest; /* type of copy source/destination */
99 FILE *copy_file; /* used if copy_dest == COPY_FILE */
100 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
101 * dest == COPY_NEW_FE in COPY FROM */
102 bool fe_eof; /* true if detected end of copy data */
103 EolType eol_type; /* EOL type of input */
104 int file_encoding; /* file or remote side's character encoding */
105 bool need_transcoding; /* file encoding diff from server? */
106 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
108 /* parameters from the COPY command */
109 Relation rel; /* relation to copy to or from */
110 QueryDesc *queryDesc; /* executable query to copy from */
111 List *attnumlist; /* integer list of attnums to copy */
112 char *filename; /* filename, or NULL for STDIN/STDOUT */
113 bool is_program; /* is 'filename' a program to popen? */
114 copy_data_source_cb data_source_cb; /* function for reading data */
115 bool binary; /* binary format? */
116 bool oids; /* include OIDs? */
117 bool freeze; /* freeze rows on loading? */
118 bool csv_mode; /* Comma Separated Value format? */
119 bool header_line; /* CSV header line? */
120 char *null_print; /* NULL marker string (server encoding!) */
121 int null_print_len; /* length of same */
122 char *null_print_client; /* same converted to file encoding */
123 char *delim; /* column delimiter (must be 1 byte) */
124 char *quote; /* CSV quote char (must be 1 byte) */
125 char *escape; /* CSV escape char (must be 1 byte) */
126 List *force_quote; /* list of column names */
127 bool force_quote_all; /* FORCE_QUOTE *? */
128 bool *force_quote_flags; /* per-column CSV FQ flags */
129 List *force_notnull; /* list of column names */
130 bool *force_notnull_flags; /* per-column CSV FNN flags */
131 List *force_null; /* list of column names */
132 bool *force_null_flags; /* per-column CSV FN flags */
133 bool convert_selectively; /* do selective binary conversion? */
134 List *convert_select; /* list of column names (can be NIL) */
135 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
137 /* these are just for error messages, see CopyFromErrorCallback */
138 const char *cur_relname; /* table name for error messages */
139 int cur_lineno; /* line number for error messages */
140 const char *cur_attname; /* current att for error messages */
141 const char *cur_attval; /* current att value for error messages */
144 * Working state for COPY TO/FROM
146 MemoryContext copycontext; /* per-copy execution context */
149 * Working state for COPY TO
151 FmgrInfo *out_functions; /* lookup info for output functions */
152 MemoryContext rowcontext; /* per-row evaluation context */
155 * Working state for COPY FROM
157 AttrNumber num_defaults;
159 FmgrInfo oid_in_function;
161 FmgrInfo *in_functions; /* array of input functions for each attrs */
162 Oid *typioparams; /* array of element types for in_functions */
163 int *defmap; /* array of default att numbers */
164 ExprState **defexprs; /* array of default att expressions */
165 bool volatile_defexprs; /* is any of defexprs volatile? */
168 PartitionDispatch *partition_dispatch_info;
169 int num_dispatch; /* Number of entries in the above array */
170 int num_partitions; /* Number of members in the following arrays */
171 ResultRelInfo *partitions; /* Per partition result relation */
172 TupleConversionMap **partition_tupconv_maps;
173 TupleTableSlot *partition_tuple_slot;
174 TransitionCaptureState *transition_capture;
175 TupleConversionMap **transition_tupconv_maps;
178 * These variables are used to reduce overhead in textual COPY FROM.
180 * attribute_buf holds the separated, de-escaped text for each field of
181 * the current line. The CopyReadAttributes functions return arrays of
182 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
183 * the buffer on each cycle.
185 StringInfoData attribute_buf;
187 /* field raw data pointers found by COPY FROM */
193 * Similarly, line_buf holds the whole input line being processed. The
194 * input cycle is first to read the whole line into line_buf, convert it
195 * to server encoding there, and then extract the individual attribute
196 * fields into attribute_buf. line_buf is preserved unmodified so that we
197 * can display it in error messages if appropriate.
199 StringInfoData line_buf;
200 bool line_buf_converted; /* converted to server encoding? */
201 bool line_buf_valid; /* contains the row being processed? */
204 * Finally, raw_buf holds raw data read from the data source (file or
205 * client connection). CopyReadLine parses this data sufficiently to
206 * locate line boundaries, then transfers the data to line_buf and
207 * converts it. Note: we guarantee that there is a \0 at
208 * raw_buf[raw_buf_len].
210 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
212 int raw_buf_index; /* next byte to process */
213 int raw_buf_len; /* total # of bytes stored */
216 /* DestReceiver for COPY (query) TO */
219 DestReceiver pub; /* publicly-known function pointers */
220 CopyState cstate; /* CopyStateData for the command */
221 uint64 processed; /* # of tuples processed */
226 * These macros centralize code used to process line_buf and raw_buf buffers.
227 * They are macros because they often do continue/break control and to avoid
228 * function call overhead in tight COPY loops.
230 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
231 * prevent the continue/break processing from working. We end the "if (1)"
232 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
233 * any "else" in the calling code, and to avoid any compiler warnings about
234 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
238 * This keeps the character read at the top of the loop in the buffer
239 * even if there is more than one read-ahead.
241 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
244 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
246 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
252 /* This consumes the remainder of the buffer and breaks */
253 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
256 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
259 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
260 /* backslash just before EOF, treat as data char */ \
267 * Transfer any approved data to line_buf; must do this to be sure
268 * there is some room in raw_buf.
270 #define REFILL_LINEBUF \
273 if (raw_buf_ptr > cstate->raw_buf_index) \
275 appendBinaryStringInfo(&cstate->line_buf, \
276 cstate->raw_buf + cstate->raw_buf_index, \
277 raw_buf_ptr - cstate->raw_buf_index); \
278 cstate->raw_buf_index = raw_buf_ptr; \
282 /* Undo any read-ahead and jump out of the block. */
283 #define NO_END_OF_COPY_GOTO \
286 raw_buf_ptr = prev_raw_ptr + 1; \
287 goto not_end_of_copy; \
290 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
293 /* non-export function prototypes */
294 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
295 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
297 static void EndCopy(CopyState cstate);
298 static void ClosePipeToProgram(CopyState cstate);
299 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
300 Oid queryRelId, const char *filename, bool is_program,
301 List *attnamelist, List *options);
302 static void EndCopyTo(CopyState cstate);
303 static uint64 DoCopyTo(CopyState cstate);
304 static uint64 CopyTo(CopyState cstate);
305 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
306 Datum *values, bool *nulls);
307 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
308 CommandId mycid, int hi_options,
309 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
310 BulkInsertState bistate,
311 int nBufferedTuples, HeapTuple *bufferedTuples,
312 int firstBufferedLineNo);
313 static bool CopyReadLine(CopyState cstate);
314 static bool CopyReadLineText(CopyState cstate);
315 static int CopyReadAttributesText(CopyState cstate);
316 static int CopyReadAttributesCSV(CopyState cstate);
317 static Datum CopyReadBinaryAttribute(CopyState cstate,
318 int column_no, FmgrInfo *flinfo,
319 Oid typioparam, int32 typmod,
321 static void CopyAttributeOutText(CopyState cstate, char *string);
322 static void CopyAttributeOutCSV(CopyState cstate, char *string,
323 bool use_quote, bool single_attr);
324 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
326 static char *limit_printout_length(const char *str);
328 /* Low-level communications functions */
329 static void SendCopyBegin(CopyState cstate);
330 static void ReceiveCopyBegin(CopyState cstate);
331 static void SendCopyEnd(CopyState cstate);
332 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
333 static void CopySendString(CopyState cstate, const char *str);
334 static void CopySendChar(CopyState cstate, char c);
335 static void CopySendEndOfRow(CopyState cstate);
336 static int CopyGetData(CopyState cstate, void *databuf,
337 int minread, int maxread);
338 static void CopySendInt32(CopyState cstate, int32 val);
339 static bool CopyGetInt32(CopyState cstate, int32 *val);
340 static void CopySendInt16(CopyState cstate, int16 val);
341 static bool CopyGetInt16(CopyState cstate, int16 *val);
345 * Send copy start/stop messages for frontend copies. These have changed
346 * in past protocol redesigns.
349 SendCopyBegin(CopyState cstate)
351 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
355 int natts = list_length(cstate->attnumlist);
356 int16 format = (cstate->binary ? 1 : 0);
359 pq_beginmessage(&buf, 'H');
360 pq_sendbyte(&buf, format); /* overall format */
361 pq_sendint(&buf, natts, 2);
362 for (i = 0; i < natts; i++)
363 pq_sendint(&buf, format, 2); /* per-column formats */
365 cstate->copy_dest = COPY_NEW_FE;
372 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
373 errmsg("COPY BINARY is not supported to stdout or from stdin")));
374 pq_putemptymessage('H');
375 /* grottiness needed for old COPY OUT protocol */
377 cstate->copy_dest = COPY_OLD_FE;
382 ReceiveCopyBegin(CopyState cstate)
384 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
388 int natts = list_length(cstate->attnumlist);
389 int16 format = (cstate->binary ? 1 : 0);
392 pq_beginmessage(&buf, 'G');
393 pq_sendbyte(&buf, format); /* overall format */
394 pq_sendint(&buf, natts, 2);
395 for (i = 0; i < natts; i++)
396 pq_sendint(&buf, format, 2); /* per-column formats */
398 cstate->copy_dest = COPY_NEW_FE;
399 cstate->fe_msgbuf = makeStringInfo();
406 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
407 errmsg("COPY BINARY is not supported to stdout or from stdin")));
408 pq_putemptymessage('G');
409 /* any error in old protocol will make us lose sync */
411 cstate->copy_dest = COPY_OLD_FE;
413 /* We *must* flush here to ensure FE knows it can send. */
418 SendCopyEnd(CopyState cstate)
420 if (cstate->copy_dest == COPY_NEW_FE)
422 /* Shouldn't have any unsent data */
423 Assert(cstate->fe_msgbuf->len == 0);
424 /* Send Copy Done message */
425 pq_putemptymessage('c');
429 CopySendData(cstate, "\\.", 2);
430 /* Need to flush out the trailer (this also appends a newline) */
431 CopySendEndOfRow(cstate);
432 pq_endcopyout(false);
437 * CopySendData sends output data to the destination (file or frontend)
438 * CopySendString does the same for null-terminated strings
439 * CopySendChar does the same for single characters
440 * CopySendEndOfRow does the appropriate thing at end of each data row
441 * (data is not actually flushed except by CopySendEndOfRow)
443 * NB: no data conversion is applied by these functions
447 CopySendData(CopyState cstate, const void *databuf, int datasize)
449 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
453 CopySendString(CopyState cstate, const char *str)
455 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
459 CopySendChar(CopyState cstate, char c)
461 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
465 CopySendEndOfRow(CopyState cstate)
467 StringInfo fe_msgbuf = cstate->fe_msgbuf;
469 switch (cstate->copy_dest)
474 /* Default line termination depends on platform */
476 CopySendChar(cstate, '\n');
478 CopySendString(cstate, "\r\n");
482 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
483 cstate->copy_file) != 1 ||
484 ferror(cstate->copy_file))
486 if (cstate->is_program)
491 * The pipe will be closed automatically on error at
492 * the end of transaction, but we might get a better
493 * error message from the subprocess' exit code than
496 ClosePipeToProgram(cstate);
499 * If ClosePipeToProgram() didn't throw an error, the
500 * program terminated normally, but closed the pipe
501 * first. Restore errno, and throw an error.
506 (errcode_for_file_access(),
507 errmsg("could not write to COPY program: %m")));
511 (errcode_for_file_access(),
512 errmsg("could not write to COPY file: %m")));
516 /* The FE/BE protocol uses \n as newline for all platforms */
518 CopySendChar(cstate, '\n');
520 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
522 /* no hope of recovering connection sync, so FATAL */
524 (errcode(ERRCODE_CONNECTION_FAILURE),
525 errmsg("connection lost during COPY to stdout")));
529 /* The FE/BE protocol uses \n as newline for all platforms */
531 CopySendChar(cstate, '\n');
533 /* Dump the accumulated row as one CopyData message */
534 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
537 Assert(false); /* Not yet supported. */
541 resetStringInfo(fe_msgbuf);
545 * CopyGetData reads data from the source (file or frontend)
547 * We attempt to read at least minread, and at most maxread, bytes from
548 * the source. The actual number of bytes read is returned; if this is
549 * less than minread, EOF was detected.
551 * Note: when copying from the frontend, we expect a proper EOF mark per
552 * protocol; if the frontend simply drops the connection, we raise error.
553 * It seems unwise to allow the COPY IN to complete normally in that case.
555 * NB: no data conversion is applied here.
558 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
562 switch (cstate->copy_dest)
565 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
566 if (ferror(cstate->copy_file))
568 (errcode_for_file_access(),
569 errmsg("could not read from COPY file: %m")));
574 * We cannot read more than minread bytes (which in practice is 1)
575 * because old protocol doesn't have any clear way of separating
576 * the COPY stream from following data. This is slow, but not any
577 * slower than the code path was originally, and we don't care
578 * much anymore about the performance of old protocol.
580 if (pq_getbytes((char *) databuf, minread))
582 /* Only a \. terminator is legal EOF in old protocol */
584 (errcode(ERRCODE_CONNECTION_FAILURE),
585 errmsg("unexpected EOF on client connection with an open transaction")));
590 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
594 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
596 /* Try to receive another message */
600 HOLD_CANCEL_INTERRUPTS();
602 mtype = pq_getbyte();
605 (errcode(ERRCODE_CONNECTION_FAILURE),
606 errmsg("unexpected EOF on client connection with an open transaction")));
607 if (pq_getmessage(cstate->fe_msgbuf, 0))
609 (errcode(ERRCODE_CONNECTION_FAILURE),
610 errmsg("unexpected EOF on client connection with an open transaction")));
611 RESUME_CANCEL_INTERRUPTS();
614 case 'd': /* CopyData */
616 case 'c': /* CopyDone */
617 /* COPY IN correctly terminated by frontend */
618 cstate->fe_eof = true;
620 case 'f': /* CopyFail */
622 (errcode(ERRCODE_QUERY_CANCELED),
623 errmsg("COPY from stdin failed: %s",
624 pq_getmsgstring(cstate->fe_msgbuf))));
626 case 'H': /* Flush */
630 * Ignore Flush/Sync for the convenience of client
631 * libraries (such as libpq) that may send those
632 * without noticing that the command they just
638 (errcode(ERRCODE_PROTOCOL_VIOLATION),
639 errmsg("unexpected message type 0x%02X during COPY from stdin",
644 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
647 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
648 databuf = (void *) ((char *) databuf + avail);
654 bytesread = cstate->data_source_cb(databuf, minread, maxread);
663 * These functions do apply some data conversion
667 * CopySendInt32 sends an int32 in network byte order
670 CopySendInt32(CopyState cstate, int32 val)
674 buf = htonl((uint32) val);
675 CopySendData(cstate, &buf, sizeof(buf));
679 * CopyGetInt32 reads an int32 that appears in network byte order
681 * Returns true if OK, false if EOF
684 CopyGetInt32(CopyState cstate, int32 *val)
688 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
690 *val = 0; /* suppress compiler warning */
693 *val = (int32) ntohl(buf);
698 * CopySendInt16 sends an int16 in network byte order
701 CopySendInt16(CopyState cstate, int16 val)
705 buf = htons((uint16) val);
706 CopySendData(cstate, &buf, sizeof(buf));
710 * CopyGetInt16 reads an int16 that appears in network byte order
713 CopyGetInt16(CopyState cstate, int16 *val)
717 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
719 *val = 0; /* suppress compiler warning */
722 *val = (int16) ntohs(buf);
728 * CopyLoadRawBuf loads some more data into raw_buf
730 * Returns TRUE if able to obtain at least one more byte, else FALSE.
732 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
733 * down to the start of the buffer and then we load more data after that.
734 * This case is used only when a frontend multibyte character crosses a
735 * bufferload boundary.
738 CopyLoadRawBuf(CopyState cstate)
743 if (cstate->raw_buf_index < cstate->raw_buf_len)
745 /* Copy down the unprocessed data */
746 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
747 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
751 nbytes = 0; /* no data need be saved */
753 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
754 1, RAW_BUF_SIZE - nbytes);
756 cstate->raw_buf[nbytes] = '\0';
757 cstate->raw_buf_index = 0;
758 cstate->raw_buf_len = nbytes;
759 return (inbytes > 0);
764 * DoCopy executes the SQL COPY statement
766 * Either unload or reload contents of table <relation>, depending on <from>.
767 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
768 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
771 * If <pipe> is false, transfer is between the table and the file named
772 * <filename>. Otherwise, transfer is between the table and our regular
773 * input/output stream. The latter could be either stdin/stdout or a
774 * socket, depending on whether we're running under Postmaster control.
776 * Do not allow a Postgres user without superuser privilege to read from
777 * or write to a file.
779 * Do not allow the copy if user doesn't have proper permission to access
780 * the table or the specifically requested columns.
783 DoCopy(ParseState *pstate, const CopyStmt *stmt,
784 int stmt_location, int stmt_len,
788 bool is_from = stmt->is_from;
789 bool pipe = (stmt->filename == NULL);
792 RawStmt *query = NULL;
794 /* Disallow COPY to/from file or program except to superusers. */
795 if (!pipe && !superuser())
797 if (stmt->is_program)
799 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
800 errmsg("must be superuser to COPY to or from an external program"),
801 errhint("Anyone can COPY to stdout or from stdin. "
802 "psql's \\copy command also works for anyone.")));
805 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
806 errmsg("must be superuser to COPY to or from a file"),
807 errhint("Anyone can COPY to stdout or from stdin. "
808 "psql's \\copy command also works for anyone.")));
818 Assert(!stmt->query);
820 /* Open and lock the relation, using the appropriate lock type. */
821 rel = heap_openrv(stmt->relation,
822 (is_from ? RowExclusiveLock : AccessShareLock));
824 relid = RelationGetRelid(rel);
826 rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
827 rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
829 tupDesc = RelationGetDescr(rel);
830 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
831 foreach(cur, attnums)
833 int attno = lfirst_int(cur) -
834 FirstLowInvalidHeapAttributeNumber;
837 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
839 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
841 ExecCheckRTPerms(pstate->p_rtable, true);
844 * Permission check for row security policies.
846 * check_enable_rls will ereport(ERROR) if the user has requested
847 * something invalid and will otherwise indicate if we should enable
848 * RLS (returns RLS_ENABLED) or not for this COPY statement.
850 * If the relation has a row security policy and we are to apply it
851 * then perform a "query" copy and allow the normal query processing
852 * to handle the policies.
854 * If RLS is not enabled for this, then just fall through to the
855 * normal non-filtering relation handling.
857 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
863 List *targetList = NIL;
867 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
868 errmsg("COPY FROM not supported with row-level security"),
869 errhint("Use INSERT statements instead.")));
874 * If no columns are specified in the attribute list of the COPY
875 * command, then the target list is 'all' columns. Therefore, '*'
876 * should be used as the target list for the resulting SELECT
879 * In the case that columns are specified in the attribute list,
880 * create a ColumnRef and ResTarget for each column and add them
881 * to the target list for the resulting SELECT statement.
885 cr = makeNode(ColumnRef);
886 cr->fields = list_make1(makeNode(A_Star));
889 target = makeNode(ResTarget);
891 target->indirection = NIL;
892 target->val = (Node *) cr;
893 target->location = -1;
895 targetList = list_make1(target);
901 foreach(lc, stmt->attlist)
904 * Build the ColumnRef for each column. The ColumnRef
905 * 'fields' property is a String 'Value' node (see
906 * nodes/value.h) that corresponds to the column name
909 cr = makeNode(ColumnRef);
910 cr->fields = list_make1(lfirst(lc));
913 /* Build the ResTarget and add the ColumnRef to it. */
914 target = makeNode(ResTarget);
916 target->indirection = NIL;
917 target->val = (Node *) cr;
918 target->location = -1;
920 /* Add each column to the SELECT statement's target list */
921 targetList = lappend(targetList, target);
926 * Build RangeVar for from clause, fully qualified based on the
927 * relation which we have opened and locked.
929 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
930 pstrdup(RelationGetRelationName(rel)),
934 select = makeNode(SelectStmt);
935 select->targetList = targetList;
936 select->fromClause = list_make1(from);
938 query = makeNode(RawStmt);
939 query->stmt = (Node *) select;
940 query->stmt_location = stmt_location;
941 query->stmt_len = stmt_len;
944 * Close the relation for now, but keep the lock on it to prevent
945 * changes between now and when we start the query-based COPY.
947 * We'll reopen it later as part of the query-based COPY.
949 heap_close(rel, NoLock);
957 query = makeNode(RawStmt);
958 query->stmt = stmt->query;
959 query->stmt_location = stmt_location;
960 query->stmt_len = stmt_len;
970 /* check read-only transaction and parallel mode */
971 if (XactReadOnly && !rel->rd_islocaltemp)
972 PreventCommandIfReadOnly("COPY FROM");
973 PreventCommandIfParallelMode("COPY FROM");
975 cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
976 NULL, stmt->attlist, stmt->options);
977 *processed = CopyFrom(cstate); /* copy from file to database */
982 cstate = BeginCopyTo(pstate, rel, query, relid,
983 stmt->filename, stmt->is_program,
984 stmt->attlist, stmt->options);
985 *processed = DoCopyTo(cstate); /* copy from database to file */
990 * Close the relation. If reading, we can release the AccessShareLock we
991 * got; if writing, we should hold the lock until end of transaction to
992 * ensure that updates will be committed before lock is released.
995 heap_close(rel, (is_from ? NoLock : AccessShareLock));
999 * Process the statement option list for COPY.
1001 * Scan the options list (a list of DefElem) and transpose the information
1002 * into cstate, applying appropriate error checking.
1004 * cstate is assumed to be filled with zeroes initially.
1006 * This is exported so that external users of the COPY API can sanity-check
1007 * a list of options. In that usage, cstate should be passed as NULL
1008 * (since external users don't know sizeof(CopyStateData)) and the collected
1009 * data is just leaked until CurrentMemoryContext is reset.
1011 * Note that additional checking, such as whether column names listed in FORCE
1012 * QUOTE actually exist, has to be applied later. This just checks for
1013 * self-consistency of the options list.
1016 ProcessCopyOptions(ParseState *pstate,
1021 bool format_specified = false;
1024 /* Support external use for option sanity checking */
1026 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1028 cstate->file_encoding = -1;
1030 /* Extract options from the statement node tree */
1031 foreach(option, options)
1033 DefElem *defel = lfirst_node(DefElem, option);
1035 if (strcmp(defel->defname, "format") == 0)
1037 char *fmt = defGetString(defel);
1039 if (format_specified)
1041 (errcode(ERRCODE_SYNTAX_ERROR),
1042 errmsg("conflicting or redundant options"),
1043 parser_errposition(pstate, defel->location)));
1044 format_specified = true;
1045 if (strcmp(fmt, "text") == 0)
1046 /* default format */ ;
1047 else if (strcmp(fmt, "csv") == 0)
1048 cstate->csv_mode = true;
1049 else if (strcmp(fmt, "binary") == 0)
1050 cstate->binary = true;
1053 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1054 errmsg("COPY format \"%s\" not recognized", fmt),
1055 parser_errposition(pstate, defel->location)));
1057 else if (strcmp(defel->defname, "oids") == 0)
1061 (errcode(ERRCODE_SYNTAX_ERROR),
1062 errmsg("conflicting or redundant options"),
1063 parser_errposition(pstate, defel->location)));
1064 cstate->oids = defGetBoolean(defel);
1066 else if (strcmp(defel->defname, "freeze") == 0)
1070 (errcode(ERRCODE_SYNTAX_ERROR),
1071 errmsg("conflicting or redundant options"),
1072 parser_errposition(pstate, defel->location)));
1073 cstate->freeze = defGetBoolean(defel);
1075 else if (strcmp(defel->defname, "delimiter") == 0)
1079 (errcode(ERRCODE_SYNTAX_ERROR),
1080 errmsg("conflicting or redundant options"),
1081 parser_errposition(pstate, defel->location)));
1082 cstate->delim = defGetString(defel);
1084 else if (strcmp(defel->defname, "null") == 0)
1086 if (cstate->null_print)
1088 (errcode(ERRCODE_SYNTAX_ERROR),
1089 errmsg("conflicting or redundant options"),
1090 parser_errposition(pstate, defel->location)));
1091 cstate->null_print = defGetString(defel);
1093 else if (strcmp(defel->defname, "header") == 0)
1095 if (cstate->header_line)
1097 (errcode(ERRCODE_SYNTAX_ERROR),
1098 errmsg("conflicting or redundant options"),
1099 parser_errposition(pstate, defel->location)));
1100 cstate->header_line = defGetBoolean(defel);
1102 else if (strcmp(defel->defname, "quote") == 0)
1106 (errcode(ERRCODE_SYNTAX_ERROR),
1107 errmsg("conflicting or redundant options"),
1108 parser_errposition(pstate, defel->location)));
1109 cstate->quote = defGetString(defel);
1111 else if (strcmp(defel->defname, "escape") == 0)
1115 (errcode(ERRCODE_SYNTAX_ERROR),
1116 errmsg("conflicting or redundant options"),
1117 parser_errposition(pstate, defel->location)));
1118 cstate->escape = defGetString(defel);
1120 else if (strcmp(defel->defname, "force_quote") == 0)
1122 if (cstate->force_quote || cstate->force_quote_all)
1124 (errcode(ERRCODE_SYNTAX_ERROR),
1125 errmsg("conflicting or redundant options"),
1126 parser_errposition(pstate, defel->location)));
1127 if (defel->arg && IsA(defel->arg, A_Star))
1128 cstate->force_quote_all = true;
1129 else if (defel->arg && IsA(defel->arg, List))
1130 cstate->force_quote = castNode(List, defel->arg);
1133 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1134 errmsg("argument to option \"%s\" must be a list of column names",
1136 parser_errposition(pstate, defel->location)));
1138 else if (strcmp(defel->defname, "force_not_null") == 0)
1140 if (cstate->force_notnull)
1142 (errcode(ERRCODE_SYNTAX_ERROR),
1143 errmsg("conflicting or redundant options"),
1144 parser_errposition(pstate, defel->location)));
1145 if (defel->arg && IsA(defel->arg, List))
1146 cstate->force_notnull = castNode(List, defel->arg);
1149 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1150 errmsg("argument to option \"%s\" must be a list of column names",
1152 parser_errposition(pstate, defel->location)));
1154 else if (strcmp(defel->defname, "force_null") == 0)
1156 if (cstate->force_null)
1158 (errcode(ERRCODE_SYNTAX_ERROR),
1159 errmsg("conflicting or redundant options")));
1160 if (defel->arg && IsA(defel->arg, List))
1161 cstate->force_null = castNode(List, defel->arg);
1164 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1165 errmsg("argument to option \"%s\" must be a list of column names",
1167 parser_errposition(pstate, defel->location)));
1169 else if (strcmp(defel->defname, "convert_selectively") == 0)
1172 * Undocumented, not-accessible-from-SQL option: convert only the
1173 * named columns to binary form, storing the rest as NULLs. It's
1174 * allowed for the column list to be NIL.
1176 if (cstate->convert_selectively)
1178 (errcode(ERRCODE_SYNTAX_ERROR),
1179 errmsg("conflicting or redundant options"),
1180 parser_errposition(pstate, defel->location)));
1181 cstate->convert_selectively = true;
1182 if (defel->arg == NULL || IsA(defel->arg, List))
1183 cstate->convert_select = castNode(List, defel->arg);
1186 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1187 errmsg("argument to option \"%s\" must be a list of column names",
1189 parser_errposition(pstate, defel->location)));
1191 else if (strcmp(defel->defname, "encoding") == 0)
1193 if (cstate->file_encoding >= 0)
1195 (errcode(ERRCODE_SYNTAX_ERROR),
1196 errmsg("conflicting or redundant options"),
1197 parser_errposition(pstate, defel->location)));
1198 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1199 if (cstate->file_encoding < 0)
1201 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1202 errmsg("argument to option \"%s\" must be a valid encoding name",
1204 parser_errposition(pstate, defel->location)));
1208 (errcode(ERRCODE_SYNTAX_ERROR),
1209 errmsg("option \"%s\" not recognized",
1211 parser_errposition(pstate, defel->location)));
1215 * Check for incompatible options (must do these two before inserting
1218 if (cstate->binary && cstate->delim)
1220 (errcode(ERRCODE_SYNTAX_ERROR),
1221 errmsg("cannot specify DELIMITER in BINARY mode")));
1223 if (cstate->binary && cstate->null_print)
1225 (errcode(ERRCODE_SYNTAX_ERROR),
1226 errmsg("cannot specify NULL in BINARY mode")));
1228 /* Set defaults for omitted options */
1230 cstate->delim = cstate->csv_mode ? "," : "\t";
1232 if (!cstate->null_print)
1233 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1234 cstate->null_print_len = strlen(cstate->null_print);
1236 if (cstate->csv_mode)
1239 cstate->quote = "\"";
1240 if (!cstate->escape)
1241 cstate->escape = cstate->quote;
1244 /* Only single-byte delimiter strings are supported. */
1245 if (strlen(cstate->delim) != 1)
1247 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1248 errmsg("COPY delimiter must be a single one-byte character")));
1250 /* Disallow end-of-line characters */
1251 if (strchr(cstate->delim, '\r') != NULL ||
1252 strchr(cstate->delim, '\n') != NULL)
1254 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1255 errmsg("COPY delimiter cannot be newline or carriage return")));
1257 if (strchr(cstate->null_print, '\r') != NULL ||
1258 strchr(cstate->null_print, '\n') != NULL)
1260 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1261 errmsg("COPY null representation cannot use newline or carriage return")));
1264 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1265 * backslash because it would be ambiguous. We can't allow the other
1266 * cases because data characters matching the delimiter must be
1267 * backslashed, and certain backslash combinations are interpreted
1268 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1269 * more than strictly necessary, but seems best for consistency and
1270 * future-proofing. Likewise we disallow all digits though only octal
1271 * digits are actually dangerous.
1273 if (!cstate->csv_mode &&
1274 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1275 cstate->delim[0]) != NULL)
1277 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1278 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1281 if (!cstate->csv_mode && cstate->header_line)
1283 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1284 errmsg("COPY HEADER available only in CSV mode")));
1287 if (!cstate->csv_mode && cstate->quote != NULL)
1289 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1290 errmsg("COPY quote available only in CSV mode")));
1292 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1294 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1295 errmsg("COPY quote must be a single one-byte character")));
1297 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1299 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1300 errmsg("COPY delimiter and quote must be different")));
1303 if (!cstate->csv_mode && cstate->escape != NULL)
1305 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1306 errmsg("COPY escape available only in CSV mode")));
1308 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1310 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1311 errmsg("COPY escape must be a single one-byte character")));
1313 /* Check force_quote */
1314 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1316 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1317 errmsg("COPY force quote available only in CSV mode")));
1318 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1320 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1321 errmsg("COPY force quote only available using COPY TO")));
1323 /* Check force_notnull */
1324 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1326 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327 errmsg("COPY force not null available only in CSV mode")));
1328 if (cstate->force_notnull != NIL && !is_from)
1330 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1331 errmsg("COPY force not null only available using COPY FROM")));
1333 /* Check force_null */
1334 if (!cstate->csv_mode && cstate->force_null != NIL)
1336 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1337 errmsg("COPY force null available only in CSV mode")));
1339 if (cstate->force_null != NIL && !is_from)
1341 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1342 errmsg("COPY force null only available using COPY FROM")));
1344 /* Don't allow the delimiter to appear in the null string. */
1345 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1347 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1348 errmsg("COPY delimiter must not appear in the NULL specification")));
1350 /* Don't allow the CSV quote char to appear in the null string. */
1351 if (cstate->csv_mode &&
1352 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1354 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1355 errmsg("CSV quote character must not appear in the NULL specification")));
1359 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1361 * Iff <binary>, unload or reload in the binary format, as opposed to the
1362 * more wasteful but more robust and portable text format.
1364 * Iff <oids>, unload or reload the format that includes OID information.
1365 * On input, we accept OIDs whether or not the table has an OID column,
1366 * but silently drop them if it does not. On output, we report an error
1367 * if the user asks for OIDs in a table that has none (not providing an
1368 * OID column might seem friendlier, but could seriously confuse programs).
1370 * If in the text format, delimit columns with delimiter <delim> and print
1371 * NULL values as <null_print>.
1374 BeginCopy(ParseState *pstate,
1385 MemoryContext oldcontext;
1387 /* Allocate workspace and zero all fields */
1388 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1391 * We allocate everything used by a cstate in a new memory context. This
1392 * avoids memory leaks during repeated use of COPY in a query.
1394 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1396 ALLOCSET_DEFAULT_SIZES);
1398 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1400 /* Extract options from the statement node tree */
1401 ProcessCopyOptions(pstate, cstate, is_from, options);
1403 /* Process the source/target relation or query */
1410 tupDesc = RelationGetDescr(cstate->rel);
1412 /* Don't allow COPY w/ OIDs to or from a table without them */
1413 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1415 (errcode(ERRCODE_UNDEFINED_COLUMN),
1416 errmsg("table \"%s\" does not have OIDs",
1417 RelationGetRelationName(cstate->rel))));
1420 * If there are any triggers with transition tables on the named
1421 * relation, we need to be prepared to capture transition tuples.
1423 cstate->transition_capture = MakeTransitionCaptureState(rel->trigdesc);
1425 /* Initialize state for CopyFrom tuple routing. */
1426 if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1428 PartitionDispatch *partition_dispatch_info;
1429 ResultRelInfo *partitions;
1430 TupleConversionMap **partition_tupconv_maps;
1431 TupleTableSlot *partition_tuple_slot;
1435 ExecSetupPartitionTupleRouting(rel,
1436 &partition_dispatch_info,
1438 &partition_tupconv_maps,
1439 &partition_tuple_slot,
1440 &num_parted, &num_partitions);
1441 cstate->partition_dispatch_info = partition_dispatch_info;
1442 cstate->num_dispatch = num_parted;
1443 cstate->partitions = partitions;
1444 cstate->num_partitions = num_partitions;
1445 cstate->partition_tupconv_maps = partition_tupconv_maps;
1446 cstate->partition_tuple_slot = partition_tuple_slot;
1449 * If we are capturing transition tuples, they may need to be
1450 * converted from partition format back to partitioned table
1451 * format (this is only ever necessary if a BEFORE trigger
1452 * modifies the tuple).
1454 if (cstate->transition_capture != NULL)
1458 cstate->transition_tupconv_maps = (TupleConversionMap **)
1459 palloc0(sizeof(TupleConversionMap *) *
1460 cstate->num_partitions);
1461 for (i = 0; i < cstate->num_partitions; ++i)
1463 cstate->transition_tupconv_maps[i] =
1464 convert_tuples_by_name(RelationGetDescr(cstate->partitions[i].ri_RelationDesc),
1465 RelationGetDescr(rel),
1466 gettext_noop("could not convert row type"));
1481 /* Don't allow COPY w/ OIDs from a query */
1484 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1485 errmsg("COPY (query) WITH OIDS is not supported")));
1488 * Run parse analysis and rewrite. Note this also acquires sufficient
1489 * locks on the source table(s).
1491 * Because the parser and planner tend to scribble on their input, we
1492 * make a preliminary copy of the source querytree. This prevents
1493 * problems in the case that the COPY is in a portal or plpgsql
1494 * function and is executed repeatedly. (See also the same hack in
1495 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1497 rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1498 pstate->p_sourcetext, NULL, 0,
1501 /* check that we got back something we can work with */
1502 if (rewritten == NIL)
1505 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1506 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1508 else if (list_length(rewritten) > 1)
1512 /* examine queries to determine which error message to issue */
1513 foreach(lc, rewritten)
1515 Query *q = lfirst_node(Query, lc);
1517 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1519 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1520 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1521 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1523 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1524 errmsg("DO ALSO rules are not supported for the COPY")));
1528 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1529 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1532 query = linitial_node(Query, rewritten);
1534 /* The grammar allows SELECT INTO, but we don't support that */
1535 if (query->utilityStmt != NULL &&
1536 IsA(query->utilityStmt, CreateTableAsStmt))
1538 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1539 errmsg("COPY (SELECT INTO) is not supported")));
1541 Assert(query->utilityStmt == NULL);
1544 * Similarly the grammar doesn't enforce the presence of a RETURNING
1545 * clause, but this is required here.
1547 if (query->commandType != CMD_SELECT &&
1548 query->returningList == NIL)
1550 Assert(query->commandType == CMD_INSERT ||
1551 query->commandType == CMD_UPDATE ||
1552 query->commandType == CMD_DELETE);
1555 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1556 errmsg("COPY query must have a RETURNING clause")));
1559 /* plan the query */
1560 plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1563 * With row level security and a user using "COPY relation TO", we
1564 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1565 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1566 * in any RLS clauses.
1568 * When this happens, we are passed in the relid of the originally
1569 * found relation (which we have locked). As the planner will look up
1570 * the relation again, we double-check here to make sure it found the
1571 * same one that we have locked.
1573 if (queryRelId != InvalidOid)
1576 * Note that with RLS involved there may be multiple relations,
1577 * and while the one we need is almost certainly first, we don't
1578 * make any guarantees of that in the planner, so check the whole
1579 * list and make sure we find the original relation.
1581 if (!list_member_oid(plan->relationOids, queryRelId))
1583 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1584 errmsg("relation referenced by COPY statement has changed")));
1588 * Use a snapshot with an updated command ID to ensure this query sees
1589 * results of any previously executed queries.
1591 PushCopiedSnapshot(GetActiveSnapshot());
1592 UpdateActiveSnapshotCommandId();
1594 /* Create dest receiver for COPY OUT */
1595 dest = CreateDestReceiver(DestCopyOut);
1596 ((DR_copy *) dest)->cstate = cstate;
1598 /* Create a QueryDesc requesting no output */
1599 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1600 GetActiveSnapshot(),
1602 dest, NULL, NULL, 0);
1605 * Call ExecutorStart to prepare the plan for execution.
1607 * ExecutorStart computes a result tupdesc for us
1609 ExecutorStart(cstate->queryDesc, 0);
1611 tupDesc = cstate->queryDesc->tupDesc;
1614 /* Generate or convert list of attributes to process */
1615 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1617 num_phys_attrs = tupDesc->natts;
1619 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1620 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1621 if (cstate->force_quote_all)
1625 for (i = 0; i < num_phys_attrs; i++)
1626 cstate->force_quote_flags[i] = true;
1628 else if (cstate->force_quote)
1633 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1635 foreach(cur, attnums)
1637 int attnum = lfirst_int(cur);
1639 if (!list_member_int(cstate->attnumlist, attnum))
1641 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1642 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1643 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1644 cstate->force_quote_flags[attnum - 1] = true;
1648 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1649 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1650 if (cstate->force_notnull)
1655 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1657 foreach(cur, attnums)
1659 int attnum = lfirst_int(cur);
1661 if (!list_member_int(cstate->attnumlist, attnum))
1663 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1664 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1665 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1666 cstate->force_notnull_flags[attnum - 1] = true;
1670 /* Convert FORCE_NULL name list to per-column flags, check validity */
1671 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1672 if (cstate->force_null)
1677 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1679 foreach(cur, attnums)
1681 int attnum = lfirst_int(cur);
1683 if (!list_member_int(cstate->attnumlist, attnum))
1685 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1686 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1687 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1688 cstate->force_null_flags[attnum - 1] = true;
1692 /* Convert convert_selectively name list to per-column flags */
1693 if (cstate->convert_selectively)
1698 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1700 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1702 foreach(cur, attnums)
1704 int attnum = lfirst_int(cur);
1706 if (!list_member_int(cstate->attnumlist, attnum))
1708 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1709 errmsg_internal("selected column \"%s\" not referenced by COPY",
1710 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1711 cstate->convert_select_flags[attnum - 1] = true;
1715 /* Use client encoding when ENCODING option is not specified. */
1716 if (cstate->file_encoding < 0)
1717 cstate->file_encoding = pg_get_client_encoding();
1720 * Set up encoding conversion info. Even if the file and server encodings
1721 * are the same, we must apply pg_any_to_server() to validate data in
1722 * multibyte encodings.
1724 cstate->need_transcoding =
1725 (cstate->file_encoding != GetDatabaseEncoding() ||
1726 pg_database_encoding_max_length() > 1);
1727 /* See Multibyte encoding comment above */
1728 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1730 cstate->copy_dest = COPY_FILE; /* default */
1732 MemoryContextSwitchTo(oldcontext);
1738 * Closes the pipe to an external program, checking the pclose() return code.
1741 ClosePipeToProgram(CopyState cstate)
1745 Assert(cstate->is_program);
1747 pclose_rc = ClosePipeStream(cstate->copy_file);
1748 if (pclose_rc == -1)
1750 (errcode_for_file_access(),
1751 errmsg("could not close pipe to external command: %m")));
1752 else if (pclose_rc != 0)
1754 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1755 errmsg("program \"%s\" failed",
1757 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1761 * Release resources allocated in a cstate for COPY TO/FROM.
1764 EndCopy(CopyState cstate)
1766 if (cstate->is_program)
1768 ClosePipeToProgram(cstate);
1772 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1774 (errcode_for_file_access(),
1775 errmsg("could not close file \"%s\": %m",
1776 cstate->filename)));
1779 MemoryContextDelete(cstate->copycontext);
1784 * Setup CopyState to read tuples from a table or a query for COPY TO.
1787 BeginCopyTo(ParseState *pstate,
1791 const char *filename,
1797 bool pipe = (filename == NULL);
1798 MemoryContext oldcontext;
1800 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1802 if (rel->rd_rel->relkind == RELKIND_VIEW)
1804 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1805 errmsg("cannot copy from view \"%s\"",
1806 RelationGetRelationName(rel)),
1807 errhint("Try the COPY (SELECT ...) TO variant.")));
1808 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1810 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1811 errmsg("cannot copy from materialized view \"%s\"",
1812 RelationGetRelationName(rel)),
1813 errhint("Try the COPY (SELECT ...) TO variant.")));
1814 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1816 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1817 errmsg("cannot copy from foreign table \"%s\"",
1818 RelationGetRelationName(rel)),
1819 errhint("Try the COPY (SELECT ...) TO variant.")));
1820 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1822 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1823 errmsg("cannot copy from sequence \"%s\"",
1824 RelationGetRelationName(rel))));
1825 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1827 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1828 errmsg("cannot copy from partitioned table \"%s\"",
1829 RelationGetRelationName(rel)),
1830 errhint("Try the COPY (SELECT ...) TO variant.")));
1833 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1834 errmsg("cannot copy from non-table relation \"%s\"",
1835 RelationGetRelationName(rel))));
1838 cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1840 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1844 Assert(!is_program); /* the grammar does not allow this */
1845 if (whereToSendOutput != DestRemote)
1846 cstate->copy_file = stdout;
1850 cstate->filename = pstrdup(filename);
1851 cstate->is_program = is_program;
1855 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1856 if (cstate->copy_file == NULL)
1858 (errcode_for_file_access(),
1859 errmsg("could not execute command \"%s\": %m",
1860 cstate->filename)));
1864 mode_t oumask; /* Pre-existing umask value */
1868 * Prevent write to relative path ... too easy to shoot oneself in
1869 * the foot by overwriting a database file ...
1871 if (!is_absolute_path(filename))
1873 (errcode(ERRCODE_INVALID_NAME),
1874 errmsg("relative path not allowed for COPY to file")));
1876 oumask = umask(S_IWGRP | S_IWOTH);
1877 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1879 if (cstate->copy_file == NULL)
1881 /* copy errno because ereport subfunctions might change it */
1882 int save_errno = errno;
1885 (errcode_for_file_access(),
1886 errmsg("could not open file \"%s\" for writing: %m",
1888 (save_errno == ENOENT || save_errno == EACCES) ?
1889 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1890 "You may want a client-side facility such as psql's \\copy.") : 0));
1893 if (fstat(fileno(cstate->copy_file), &st))
1895 (errcode_for_file_access(),
1896 errmsg("could not stat file \"%s\": %m",
1897 cstate->filename)));
1899 if (S_ISDIR(st.st_mode))
1901 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1902 errmsg("\"%s\" is a directory", cstate->filename)));
1906 MemoryContextSwitchTo(oldcontext);
1912 * This intermediate routine exists mainly to localize the effects of setjmp
1913 * so we don't need to plaster a lot of variables with "volatile".
1916 DoCopyTo(CopyState cstate)
1918 bool pipe = (cstate->filename == NULL);
1919 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1925 SendCopyBegin(cstate);
1927 processed = CopyTo(cstate);
1930 SendCopyEnd(cstate);
1935 * Make sure we turn off old-style COPY OUT mode upon error. It is
1936 * okay to do this in all cases, since it does nothing if the mode is
1939 pq_endcopyout(true);
1948 * Clean up storage and release resources for COPY TO.
1951 EndCopyTo(CopyState cstate)
1953 if (cstate->queryDesc != NULL)
1955 /* Close down the query and free resources. */
1956 ExecutorFinish(cstate->queryDesc);
1957 ExecutorEnd(cstate->queryDesc);
1958 FreeQueryDesc(cstate->queryDesc);
1959 PopActiveSnapshot();
1962 /* Clean up storage */
1967 * Copy from relation or query TO file.
1970 CopyTo(CopyState cstate)
1974 Form_pg_attribute *attr;
1979 tupDesc = RelationGetDescr(cstate->rel);
1981 tupDesc = cstate->queryDesc->tupDesc;
1982 attr = tupDesc->attrs;
1983 num_phys_attrs = tupDesc->natts;
1984 cstate->null_print_client = cstate->null_print; /* default */
1986 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1987 cstate->fe_msgbuf = makeStringInfo();
1989 /* Get info about the columns we need to process. */
1990 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1991 foreach(cur, cstate->attnumlist)
1993 int attnum = lfirst_int(cur);
1998 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
2002 getTypeOutputInfo(attr[attnum - 1]->atttypid,
2005 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
2009 * Create a temporary memory context that we can reset once per row to
2010 * recover palloc'd memory. This avoids any problems with leaks inside
2011 * datatype output routines, and should be faster than retail pfree's
2012 * anyway. (We don't need a whole econtext as CopyFrom does.)
2014 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
2016 ALLOCSET_DEFAULT_SIZES);
2020 /* Generate header for a binary copy */
2024 CopySendData(cstate, BinarySignature, 11);
2029 CopySendInt32(cstate, tmp);
2030 /* No header extension */
2032 CopySendInt32(cstate, tmp);
2037 * For non-binary copy, we need to convert null_print to file
2038 * encoding, because it will be sent directly with CopySendString.
2040 if (cstate->need_transcoding)
2041 cstate->null_print_client = pg_server_to_any(cstate->null_print,
2042 cstate->null_print_len,
2043 cstate->file_encoding);
2045 /* if a header has been requested send the line */
2046 if (cstate->header_line)
2048 bool hdr_delim = false;
2050 foreach(cur, cstate->attnumlist)
2052 int attnum = lfirst_int(cur);
2056 CopySendChar(cstate, cstate->delim[0]);
2059 colname = NameStr(attr[attnum - 1]->attname);
2061 CopyAttributeOutCSV(cstate, colname, false,
2062 list_length(cstate->attnumlist) == 1);
2065 CopySendEndOfRow(cstate);
2073 HeapScanDesc scandesc;
2076 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2077 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2079 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2082 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2084 CHECK_FOR_INTERRUPTS();
2086 /* Deconstruct the tuple ... faster than repeated heap_getattr */
2087 heap_deform_tuple(tuple, tupDesc, values, nulls);
2089 /* Format and send the data */
2090 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2094 heap_endscan(scandesc);
2101 /* run the plan --- the dest receiver will send tuples */
2102 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2103 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2108 /* Generate trailer for a binary copy */
2109 CopySendInt16(cstate, -1);
2110 /* Need to flush out the trailer */
2111 CopySendEndOfRow(cstate);
2114 MemoryContextDelete(cstate->rowcontext);
2120 * Emit one row during CopyTo().
2123 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2125 bool need_delim = false;
2126 FmgrInfo *out_functions = cstate->out_functions;
2127 MemoryContext oldcontext;
2131 MemoryContextReset(cstate->rowcontext);
2132 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2136 /* Binary per-tuple header */
2137 CopySendInt16(cstate, list_length(cstate->attnumlist));
2138 /* Send OID if wanted --- note attnumlist doesn't include it */
2141 /* Hack --- assume Oid is same size as int32 */
2142 CopySendInt32(cstate, sizeof(int32));
2143 CopySendInt32(cstate, tupleOid);
2148 /* Text format has no per-tuple header, but send OID if wanted */
2149 /* Assume digits don't need any quoting or encoding conversion */
2152 string = DatumGetCString(DirectFunctionCall1(oidout,
2153 ObjectIdGetDatum(tupleOid)));
2154 CopySendString(cstate, string);
2159 foreach(cur, cstate->attnumlist)
2161 int attnum = lfirst_int(cur);
2162 Datum value = values[attnum - 1];
2163 bool isnull = nulls[attnum - 1];
2165 if (!cstate->binary)
2168 CopySendChar(cstate, cstate->delim[0]);
2174 if (!cstate->binary)
2175 CopySendString(cstate, cstate->null_print_client);
2177 CopySendInt32(cstate, -1);
2181 if (!cstate->binary)
2183 string = OutputFunctionCall(&out_functions[attnum - 1],
2185 if (cstate->csv_mode)
2186 CopyAttributeOutCSV(cstate, string,
2187 cstate->force_quote_flags[attnum - 1],
2188 list_length(cstate->attnumlist) == 1);
2190 CopyAttributeOutText(cstate, string);
2196 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2198 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2199 CopySendData(cstate, VARDATA(outputbytes),
2200 VARSIZE(outputbytes) - VARHDRSZ);
2205 CopySendEndOfRow(cstate);
2207 MemoryContextSwitchTo(oldcontext);
2212 * error context callback for COPY FROM
2214 * The argument for the error context must be CopyState.
2217 CopyFromErrorCallback(void *arg)
2219 CopyState cstate = (CopyState) arg;
2223 /* can't usefully display the data */
2224 if (cstate->cur_attname)
2225 errcontext("COPY %s, line %d, column %s",
2226 cstate->cur_relname, cstate->cur_lineno,
2227 cstate->cur_attname);
2229 errcontext("COPY %s, line %d",
2230 cstate->cur_relname, cstate->cur_lineno);
2234 if (cstate->cur_attname && cstate->cur_attval)
2236 /* error is relevant to a particular column */
2239 attval = limit_printout_length(cstate->cur_attval);
2240 errcontext("COPY %s, line %d, column %s: \"%s\"",
2241 cstate->cur_relname, cstate->cur_lineno,
2242 cstate->cur_attname, attval);
2245 else if (cstate->cur_attname)
2247 /* error is relevant to a particular column, value is NULL */
2248 errcontext("COPY %s, line %d, column %s: null input",
2249 cstate->cur_relname, cstate->cur_lineno,
2250 cstate->cur_attname);
2255 * Error is relevant to a particular line.
2257 * If line_buf still contains the correct line, and it's already
2258 * transcoded, print it. If it's still in a foreign encoding, it's
2259 * quite likely that the error is precisely a failure to do
2260 * encoding conversion (ie, bad data). We dare not try to convert
2261 * it, and at present there's no way to regurgitate it without
2262 * conversion. So we have to punt and just report the line number.
2264 if (cstate->line_buf_valid &&
2265 (cstate->line_buf_converted || !cstate->need_transcoding))
2269 lineval = limit_printout_length(cstate->line_buf.data);
2270 errcontext("COPY %s, line %d: \"%s\"",
2271 cstate->cur_relname, cstate->cur_lineno, lineval);
2276 errcontext("COPY %s, line %d",
2277 cstate->cur_relname, cstate->cur_lineno);
2284 * Make sure we don't print an unreasonable amount of COPY data in a message.
2286 * It would seem a lot easier to just use the sprintf "precision" limit to
2287 * truncate the string. However, some versions of glibc have a bug/misfeature
2288 * that vsnprintf will always fail (return -1) if it is asked to truncate
2289 * a string that contains invalid byte sequences for the current encoding.
2290 * So, do our own truncation. We return a pstrdup'd copy of the input.
2293 limit_printout_length(const char *str)
2295 #define MAX_COPY_DATA_DISPLAY 100
2297 int slen = strlen(str);
2301 /* Fast path if definitely okay */
2302 if (slen <= MAX_COPY_DATA_DISPLAY)
2303 return pstrdup(str);
2305 /* Apply encoding-dependent truncation */
2306 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2309 * Truncate, and add "..." to show we truncated the input.
2311 res = (char *) palloc(len + 4);
2312 memcpy(res, str, len);
2313 strcpy(res + len, "...");
2319 * Copy FROM file to relation.
2322 CopyFrom(CopyState cstate)
2328 ResultRelInfo *resultRelInfo;
2329 ResultRelInfo *saved_resultRelInfo = NULL;
2330 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2331 ExprContext *econtext;
2332 TupleTableSlot *myslot;
2333 MemoryContext oldcontext = CurrentMemoryContext;
2335 ErrorContextCallback errcallback;
2336 CommandId mycid = GetCurrentCommandId(true);
2337 int hi_options = 0; /* start with default heap_insert options */
2338 BulkInsertState bistate;
2339 uint64 processed = 0;
2340 bool useHeapMultiInsert;
2341 int nBufferedTuples = 0;
2342 int prev_leaf_part_index = -1;
2344 #define MAX_BUFFERED_TUPLES 1000
2345 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2346 Size bufferedTuplesSize = 0;
2347 int firstBufferedLineNo = 0;
2349 Assert(cstate->rel);
2352 * The target must be a plain relation or have an INSTEAD OF INSERT row
2353 * trigger. (Currently, such triggers are only allowed on views, so we
2354 * only hint about them in the view case.)
2356 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2357 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2358 !(cstate->rel->trigdesc &&
2359 cstate->rel->trigdesc->trig_insert_instead_row))
2361 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2363 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2364 errmsg("cannot copy to view \"%s\"",
2365 RelationGetRelationName(cstate->rel)),
2366 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2367 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2369 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2370 errmsg("cannot copy to materialized view \"%s\"",
2371 RelationGetRelationName(cstate->rel))));
2372 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2374 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2375 errmsg("cannot copy to foreign table \"%s\"",
2376 RelationGetRelationName(cstate->rel))));
2377 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2379 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2380 errmsg("cannot copy to sequence \"%s\"",
2381 RelationGetRelationName(cstate->rel))));
2384 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2385 errmsg("cannot copy to non-table relation \"%s\"",
2386 RelationGetRelationName(cstate->rel))));
2389 tupDesc = RelationGetDescr(cstate->rel);
2392 * Check to see if we can avoid writing WAL
2394 * If archive logging/streaming is not enabled *and* either
2395 * - table was created in same transaction as this COPY
2396 * - data is being written to relfilenode created in this transaction
2397 * then we can skip writing WAL. It's safe because if the transaction
2398 * doesn't commit, we'll discard the table (or the new relfilenode file).
2399 * If it does commit, we'll have done the heap_sync at the bottom of this
2402 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2403 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2404 * can be cleared before the end of the transaction. The exact case is
2405 * when a relation sets a new relfilenode twice in same transaction, yet
2406 * the second one fails in an aborted subtransaction, e.g.
2415 * Also, if the target file is new-in-transaction, we assume that checking
2416 * FSM for free space is a waste of time, even if we must use WAL because
2417 * of archiving. This could possibly be wrong, but it's unlikely.
2419 * The comments for heap_insert and RelationGetBufferForTuple specify that
2420 * skipping WAL logging is only safe if we ensure that our tuples do not
2421 * go into pages containing tuples from any other transactions --- but this
2422 * must be the case if we have a new table or new relfilenode, so we need
2423 * no additional work to enforce that.
2426 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2427 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2428 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2430 hi_options |= HEAP_INSERT_SKIP_FSM;
2431 if (!XLogIsNeeded())
2432 hi_options |= HEAP_INSERT_SKIP_WAL;
2436 * Optimize if new relfilenode was created in this subxact or one of its
2437 * committed children and we won't see those rows later as part of an
2438 * earlier scan or command. This ensures that if this subtransaction
2439 * aborts then the frozen rows won't be visible after xact cleanup. Note
2440 * that the stronger test of exactly which subtransaction created it is
2441 * crucial for correctness of this optimization.
2445 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2447 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2448 errmsg("cannot perform FREEZE because of prior transaction activity")));
2450 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2451 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2453 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2454 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2456 hi_options |= HEAP_INSERT_FROZEN;
2460 * We need a ResultRelInfo so we can use the regular executor's
2461 * index-entry-making machinery. (There used to be a huge amount of code
2462 * here that basically duplicated execUtils.c ...)
2464 resultRelInfo = makeNode(ResultRelInfo);
2465 InitResultRelInfo(resultRelInfo,
2467 1, /* dummy rangetable index */
2471 ExecOpenIndices(resultRelInfo, false);
2473 estate->es_result_relations = resultRelInfo;
2474 estate->es_num_result_relations = 1;
2475 estate->es_result_relation_info = resultRelInfo;
2476 estate->es_range_table = cstate->range_table;
2478 /* Set up a tuple slot too */
2479 myslot = ExecInitExtraTupleSlot(estate);
2480 ExecSetSlotDescriptor(myslot, tupDesc);
2481 /* Triggers might need a slot as well */
2482 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2485 * It's more efficient to prepare a bunch of tuples for insertion, and
2486 * insert them in one heap_multi_insert() call, than call heap_insert()
2487 * separately for every tuple. However, we can't do that if there are
2488 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2489 * expressions. Such triggers or expressions might query the table we're
2490 * inserting to, and act differently if the tuples that have already been
2491 * processed and prepared for insertion are not there. We also can't do
2492 * it if the table is partitioned.
2494 if ((resultRelInfo->ri_TrigDesc != NULL &&
2495 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2496 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2497 cstate->partition_dispatch_info != NULL ||
2498 cstate->volatile_defexprs)
2500 useHeapMultiInsert = false;
2504 useHeapMultiInsert = true;
2505 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2508 /* Prepare to catch AFTER triggers. */
2509 AfterTriggerBeginQuery();
2512 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2513 * should do this for COPY, since it's not really an "INSERT" statement as
2514 * such. However, executing these triggers maintains consistency with the
2515 * EACH ROW triggers that we already fire on COPY.
2517 ExecBSInsertTriggers(estate, resultRelInfo);
2519 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2520 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2522 bistate = GetBulkInsertState();
2523 econtext = GetPerTupleExprContext(estate);
2525 /* Set up callback to identify error line number */
2526 errcallback.callback = CopyFromErrorCallback;
2527 errcallback.arg = (void *) cstate;
2528 errcallback.previous = error_context_stack;
2529 error_context_stack = &errcallback;
2533 TupleTableSlot *slot;
2535 Oid loaded_oid = InvalidOid;
2537 CHECK_FOR_INTERRUPTS();
2539 if (nBufferedTuples == 0)
2542 * Reset the per-tuple exprcontext. We can only do this if the
2543 * tuple buffer is empty. (Calling the context the per-tuple
2544 * memory context is a bit of a misnomer now.)
2546 ResetPerTupleExprContext(estate);
2549 /* Switch into its memory context */
2550 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2552 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2555 /* And now we can form the input tuple. */
2556 tuple = heap_form_tuple(tupDesc, values, nulls);
2558 if (loaded_oid != InvalidOid)
2559 HeapTupleSetOid(tuple, loaded_oid);
2562 * Constraints might reference the tableoid column, so initialize
2563 * t_tableOid before evaluating them.
2565 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2567 /* Triggers and stuff need to be invoked in query context. */
2568 MemoryContextSwitchTo(oldcontext);
2570 /* Place tuple in tuple slot --- but slot shouldn't free it */
2572 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2574 /* Determine the partition to heap_insert the tuple into */
2575 if (cstate->partition_dispatch_info)
2577 int leaf_part_index;
2578 TupleConversionMap *map;
2581 * Away we go ... If we end up not finding a partition after all,
2582 * ExecFindPartition() does not return and errors out instead.
2583 * Otherwise, the returned value is to be used as an index into
2584 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2585 * will get us the ResultRelInfo and TupleConversionMap for the
2586 * partition, respectively.
2588 leaf_part_index = ExecFindPartition(resultRelInfo,
2589 cstate->partition_dispatch_info,
2592 Assert(leaf_part_index >= 0 &&
2593 leaf_part_index < cstate->num_partitions);
2596 * If this tuple is mapped to a partition that is not same as the
2597 * previous one, we'd better make the bulk insert mechanism gets a
2600 if (prev_leaf_part_index != leaf_part_index)
2602 ReleaseBulkInsertStatePin(bistate);
2603 prev_leaf_part_index = leaf_part_index;
2607 * Save the old ResultRelInfo and switch to the one corresponding
2608 * to the selected partition.
2610 saved_resultRelInfo = resultRelInfo;
2611 resultRelInfo = cstate->partitions + leaf_part_index;
2613 /* We do not yet have a way to insert into a foreign partition */
2614 if (resultRelInfo->ri_FdwRoutine)
2616 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2617 errmsg("cannot route inserted tuples to a foreign table")));
2620 * For ExecInsertIndexTuples() to work on the partition's indexes
2622 estate->es_result_relation_info = resultRelInfo;
2625 * If we're capturing transition tuples, we might need to convert
2626 * from the partition rowtype to parent rowtype.
2628 if (cstate->transition_capture != NULL)
2630 if (resultRelInfo->ri_TrigDesc &&
2631 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2632 resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2635 * If there are any BEFORE or INSTEAD triggers on the
2636 * partition, we'll have to be ready to convert their
2637 * result back to tuplestore format.
2639 cstate->transition_capture->tcs_original_insert_tuple = NULL;
2640 cstate->transition_capture->tcs_map =
2641 cstate->transition_tupconv_maps[leaf_part_index];
2646 * Otherwise, just remember the original unconverted
2647 * tuple, to avoid a needless round trip conversion.
2649 cstate->transition_capture->tcs_original_insert_tuple = tuple;
2650 cstate->transition_capture->tcs_map = NULL;
2654 * We might need to convert from the parent rowtype to the
2655 * partition rowtype.
2657 map = cstate->partition_tupconv_maps[leaf_part_index];
2660 Relation partrel = resultRelInfo->ri_RelationDesc;
2662 tuple = do_convert_tuple(tuple, map);
2665 * We must use the partition's tuple descriptor from this
2666 * point on. Use a dedicated slot from this point on until
2667 * we're finished dealing with the partition.
2669 slot = cstate->partition_tuple_slot;
2670 Assert(slot != NULL);
2671 ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2672 ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2675 tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2680 /* BEFORE ROW INSERT Triggers */
2681 if (resultRelInfo->ri_TrigDesc &&
2682 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2684 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2686 if (slot == NULL) /* "do nothing" */
2688 else /* trigger might have changed tuple */
2689 tuple = ExecMaterializeSlot(slot);
2694 if (resultRelInfo->ri_TrigDesc &&
2695 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2697 /* Pass the data to the INSTEAD ROW INSERT trigger */
2698 ExecIRInsertTriggers(estate, resultRelInfo, slot);
2703 * We always check the partition constraint, including when
2704 * the tuple got here via tuple-routing. However we don't
2705 * need to in the latter case if no BR trigger is defined on
2706 * the partition. Note that a BR trigger might modify the
2707 * tuple such that the partition constraint is no longer
2708 * satisfied, so we need to check in that case.
2710 bool check_partition_constr =
2711 (resultRelInfo->ri_PartitionCheck != NIL);
2713 if (saved_resultRelInfo != NULL &&
2714 !(resultRelInfo->ri_TrigDesc &&
2715 resultRelInfo->ri_TrigDesc->trig_insert_before_row))
2716 check_partition_constr = false;
2718 /* Check the constraints of the tuple */
2719 if (cstate->rel->rd_att->constr || check_partition_constr)
2720 ExecConstraints(resultRelInfo, slot, estate);
2722 if (useHeapMultiInsert)
2724 /* Add this tuple to the tuple buffer */
2725 if (nBufferedTuples == 0)
2726 firstBufferedLineNo = cstate->cur_lineno;
2727 bufferedTuples[nBufferedTuples++] = tuple;
2728 bufferedTuplesSize += tuple->t_len;
2731 * If the buffer filled up, flush it. Also flush if the
2732 * total size of all the tuples in the buffer becomes
2733 * large, to avoid using large amounts of memory for the
2734 * buffer when the tuples are exceptionally wide.
2736 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2737 bufferedTuplesSize > 65535)
2739 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2740 resultRelInfo, myslot, bistate,
2741 nBufferedTuples, bufferedTuples,
2742 firstBufferedLineNo);
2743 nBufferedTuples = 0;
2744 bufferedTuplesSize = 0;
2749 List *recheckIndexes = NIL;
2751 /* OK, store the tuple and create index entries for it */
2752 heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2753 hi_options, bistate);
2755 if (resultRelInfo->ri_NumIndices > 0)
2756 recheckIndexes = ExecInsertIndexTuples(slot,
2763 /* AFTER ROW INSERT Triggers */
2764 ExecARInsertTriggers(estate, resultRelInfo, tuple,
2765 recheckIndexes, cstate->transition_capture);
2767 list_free(recheckIndexes);
2772 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2773 * this is the same definition used by execMain.c for counting
2774 * tuples inserted by an INSERT command.
2778 if (saved_resultRelInfo)
2780 resultRelInfo = saved_resultRelInfo;
2781 estate->es_result_relation_info = resultRelInfo;
2786 /* Flush any remaining buffered tuples */
2787 if (nBufferedTuples > 0)
2788 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2789 resultRelInfo, myslot, bistate,
2790 nBufferedTuples, bufferedTuples,
2791 firstBufferedLineNo);
2793 /* Done, clean up */
2794 error_context_stack = errcallback.previous;
2796 FreeBulkInsertState(bistate);
2798 MemoryContextSwitchTo(oldcontext);
2801 * In the old protocol, tell pqcomm that we can process normal protocol
2804 if (cstate->copy_dest == COPY_OLD_FE)
2807 /* Execute AFTER STATEMENT insertion triggers */
2808 ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2810 /* Handle queued AFTER triggers */
2811 AfterTriggerEndQuery(estate);
2816 ExecResetTupleTable(estate->es_tupleTable, false);
2818 ExecCloseIndices(resultRelInfo);
2820 /* Close all the partitioned tables, leaf partitions, and their indices */
2821 if (cstate->partition_dispatch_info)
2826 * Remember cstate->partition_dispatch_info[0] corresponds to the root
2827 * partitioned table, which we must not try to close, because it is
2828 * the main target table of COPY that will be closed eventually by
2829 * DoCopy(). Also, tupslot is NULL for the root partitioned table.
2831 for (i = 1; i < cstate->num_dispatch; i++)
2833 PartitionDispatch pd = cstate->partition_dispatch_info[i];
2835 heap_close(pd->reldesc, NoLock);
2836 ExecDropSingleTupleTableSlot(pd->tupslot);
2838 for (i = 0; i < cstate->num_partitions; i++)
2840 ResultRelInfo *resultRelInfo = cstate->partitions + i;
2842 ExecCloseIndices(resultRelInfo);
2843 heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2846 /* Release the standalone partition tuple descriptor */
2847 ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
2850 /* Close any trigger target relations */
2851 ExecCleanUpTriggerState(estate);
2853 FreeExecutorState(estate);
2856 * If we skipped writing WAL, then we need to sync the heap (but not
2857 * indexes since those use WAL anyway)
2859 if (hi_options & HEAP_INSERT_SKIP_WAL)
2860 heap_sync(cstate->rel);
2866 * A subroutine of CopyFrom, to write the current batch of buffered heap
2867 * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2871 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2872 int hi_options, ResultRelInfo *resultRelInfo,
2873 TupleTableSlot *myslot, BulkInsertState bistate,
2874 int nBufferedTuples, HeapTuple *bufferedTuples,
2875 int firstBufferedLineNo)
2877 MemoryContext oldcontext;
2879 int save_cur_lineno;
2882 * Print error context information correctly, if one of the operations
2885 cstate->line_buf_valid = false;
2886 save_cur_lineno = cstate->cur_lineno;
2889 * heap_multi_insert leaks memory, so switch to short-lived memory context
2890 * before calling it.
2892 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2893 heap_multi_insert(cstate->rel,
2899 MemoryContextSwitchTo(oldcontext);
2902 * If there are any indexes, update them for all the inserted tuples, and
2903 * run AFTER ROW INSERT triggers.
2905 if (resultRelInfo->ri_NumIndices > 0)
2907 for (i = 0; i < nBufferedTuples; i++)
2909 List *recheckIndexes;
2911 cstate->cur_lineno = firstBufferedLineNo + i;
2912 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2914 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2915 estate, false, NULL, NIL);
2916 ExecARInsertTriggers(estate, resultRelInfo,
2918 recheckIndexes, cstate->transition_capture);
2919 list_free(recheckIndexes);
2924 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2927 else if (resultRelInfo->ri_TrigDesc != NULL &&
2928 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2929 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2931 for (i = 0; i < nBufferedTuples; i++)
2933 cstate->cur_lineno = firstBufferedLineNo + i;
2934 ExecARInsertTriggers(estate, resultRelInfo,
2936 NIL, cstate->transition_capture);
2940 /* reset cur_lineno to where we were */
2941 cstate->cur_lineno = save_cur_lineno;
2945 * Setup to read tuples from a file for COPY FROM.
2947 * 'rel': Used as a template for the tuples
2948 * 'filename': Name of server-local file to read
2949 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2950 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2952 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2955 BeginCopyFrom(ParseState *pstate,
2957 const char *filename,
2959 copy_data_source_cb data_source_cb,
2964 bool pipe = (filename == NULL);
2966 Form_pg_attribute *attr;
2967 AttrNumber num_phys_attrs,
2969 FmgrInfo *in_functions;
2974 ExprState **defexprs;
2975 MemoryContext oldcontext;
2976 bool volatile_defexprs;
2978 cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
2979 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2981 /* Initialize state variables */
2982 cstate->fe_eof = false;
2983 cstate->eol_type = EOL_UNKNOWN;
2984 cstate->cur_relname = RelationGetRelationName(cstate->rel);
2985 cstate->cur_lineno = 0;
2986 cstate->cur_attname = NULL;
2987 cstate->cur_attval = NULL;
2989 /* Set up variables to avoid per-attribute overhead. */
2990 initStringInfo(&cstate->attribute_buf);
2991 initStringInfo(&cstate->line_buf);
2992 cstate->line_buf_converted = false;
2993 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2994 cstate->raw_buf_index = cstate->raw_buf_len = 0;
2996 /* Assign range table, we'll need it in CopyFrom. */
2998 cstate->range_table = pstate->p_rtable;
3000 tupDesc = RelationGetDescr(cstate->rel);
3001 attr = tupDesc->attrs;
3002 num_phys_attrs = tupDesc->natts;
3004 volatile_defexprs = false;
3007 * Pick up the required catalog information for each attribute in the
3008 * relation, including the input function, the element type (to pass to
3009 * the input function), and info about defaults and constraints. (Which
3010 * input function we use depends on text/binary format choice.)
3012 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3013 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3014 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3015 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3017 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3019 /* We don't need info for dropped attributes */
3020 if (attr[attnum - 1]->attisdropped)
3023 /* Fetch the input function and typioparam info */
3025 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
3026 &in_func_oid, &typioparams[attnum - 1]);
3028 getTypeInputInfo(attr[attnum - 1]->atttypid,
3029 &in_func_oid, &typioparams[attnum - 1]);
3030 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3032 /* Get default info if needed */
3033 if (!list_member_int(cstate->attnumlist, attnum))
3035 /* attribute is NOT to be copied from input */
3036 /* use default value if one exists */
3037 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3040 if (defexpr != NULL)
3042 /* Run the expression through planner */
3043 defexpr = expression_planner(defexpr);
3045 /* Initialize executable expression in copycontext */
3046 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3047 defmap[num_defaults] = attnum - 1;
3051 * If a default expression looks at the table being loaded,
3052 * then it could give the wrong answer when using
3053 * multi-insert. Since database access can be dynamic this is
3054 * hard to test for exactly, so we use the much wider test of
3055 * whether the default expression is volatile. We allow for
3056 * the special case of when the default expression is the
3057 * nextval() of a sequence which in this specific case is
3058 * known to be safe for use with the multi-insert
3059 * optimization. Hence we use this special case function
3060 * checker rather than the standard check for
3061 * contain_volatile_functions().
3063 if (!volatile_defexprs)
3064 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3069 /* We keep those variables in cstate. */
3070 cstate->in_functions = in_functions;
3071 cstate->typioparams = typioparams;
3072 cstate->defmap = defmap;
3073 cstate->defexprs = defexprs;
3074 cstate->volatile_defexprs = volatile_defexprs;
3075 cstate->num_defaults = num_defaults;
3076 cstate->is_program = is_program;
3080 cstate->copy_dest = COPY_CALLBACK;
3081 cstate->data_source_cb = data_source_cb;
3085 Assert(!is_program); /* the grammar does not allow this */
3086 if (whereToSendOutput == DestRemote)
3087 ReceiveCopyBegin(cstate);
3089 cstate->copy_file = stdin;
3093 cstate->filename = pstrdup(filename);
3095 if (cstate->is_program)
3097 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3098 if (cstate->copy_file == NULL)
3100 (errcode_for_file_access(),
3101 errmsg("could not execute command \"%s\": %m",
3102 cstate->filename)));
3108 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3109 if (cstate->copy_file == NULL)
3111 /* copy errno because ereport subfunctions might change it */
3112 int save_errno = errno;
3115 (errcode_for_file_access(),
3116 errmsg("could not open file \"%s\" for reading: %m",
3118 (save_errno == ENOENT || save_errno == EACCES) ?
3119 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3120 "You may want a client-side facility such as psql's \\copy.") : 0));
3123 if (fstat(fileno(cstate->copy_file), &st))
3125 (errcode_for_file_access(),
3126 errmsg("could not stat file \"%s\": %m",
3127 cstate->filename)));
3129 if (S_ISDIR(st.st_mode))
3131 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3132 errmsg("\"%s\" is a directory", cstate->filename)));
3136 if (!cstate->binary)
3138 /* must rely on user to tell us... */
3139 cstate->file_has_oids = cstate->oids;
3143 /* Read and verify binary header */
3148 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3149 memcmp(readSig, BinarySignature, 11) != 0)
3151 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3152 errmsg("COPY file signature not recognized")));
3154 if (!CopyGetInt32(cstate, &tmp))
3156 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3157 errmsg("invalid COPY file header (missing flags)")));
3158 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3160 if ((tmp >> 16) != 0)
3162 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3163 errmsg("unrecognized critical flags in COPY file header")));
3164 /* Header extension length */
3165 if (!CopyGetInt32(cstate, &tmp) ||
3168 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3169 errmsg("invalid COPY file header (missing length)")));
3170 /* Skip extension header, if present */
3173 if (CopyGetData(cstate, readSig, 1, 1) != 1)
3175 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3176 errmsg("invalid COPY file header (wrong length)")));
3180 if (cstate->file_has_oids && cstate->binary)
3182 getTypeBinaryInputInfo(OIDOID,
3183 &in_func_oid, &cstate->oid_typioparam);
3184 fmgr_info(in_func_oid, &cstate->oid_in_function);
3187 /* create workspace for CopyReadAttributes results */
3188 if (!cstate->binary)
3190 AttrNumber attr_count = list_length(cstate->attnumlist);
3191 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3193 cstate->max_fields = nfields;
3194 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3197 MemoryContextSwitchTo(oldcontext);
3203 * Read raw fields in the next line for COPY FROM in text or csv mode.
3204 * Return false if no more lines.
3206 * An internal temporary buffer is returned via 'fields'. It is valid until
3207 * the next call of the function. Since the function returns all raw fields
3208 * in the input file, 'nfields' could be different from the number of columns
3211 * NOTE: force_not_null option are not applied to the returned fields.
3214 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3219 /* only available for text or csv input */
3220 Assert(!cstate->binary);
3222 /* on input just throw the header line away */
3223 if (cstate->cur_lineno == 0 && cstate->header_line)
3225 cstate->cur_lineno++;
3226 if (CopyReadLine(cstate))
3227 return false; /* done */
3230 cstate->cur_lineno++;
3232 /* Actually read the line into memory here */
3233 done = CopyReadLine(cstate);
3236 * EOF at start of line means we're done. If we see EOF after some
3237 * characters, we act as though it was newline followed by EOF, ie,
3238 * process the line and then exit loop on next iteration.
3240 if (done && cstate->line_buf.len == 0)
3243 /* Parse the line into de-escaped field values */
3244 if (cstate->csv_mode)
3245 fldct = CopyReadAttributesCSV(cstate);
3247 fldct = CopyReadAttributesText(cstate);
3249 *fields = cstate->raw_fields;
3255 * Read next tuple from file for COPY FROM. Return false if no more tuples.
3257 * 'econtext' is used to evaluate default expression for each columns not
3258 * read from the file. It can be NULL when no default values are used, i.e.
3259 * when all columns are read from the file.
3261 * 'values' and 'nulls' arrays must be the same length as columns of the
3262 * relation passed to BeginCopyFrom. This function fills the arrays.
3263 * Oid of the tuple is returned with 'tupleOid' separately.
3266 NextCopyFrom(CopyState cstate, ExprContext *econtext,
3267 Datum *values, bool *nulls, Oid *tupleOid)
3270 Form_pg_attribute *attr;
3271 AttrNumber num_phys_attrs,
3273 num_defaults = cstate->num_defaults;
3274 FmgrInfo *in_functions = cstate->in_functions;
3275 Oid *typioparams = cstate->typioparams;
3279 bool file_has_oids = cstate->file_has_oids;
3280 int *defmap = cstate->defmap;
3281 ExprState **defexprs = cstate->defexprs;
3283 tupDesc = RelationGetDescr(cstate->rel);
3284 attr = tupDesc->attrs;
3285 num_phys_attrs = tupDesc->natts;
3286 attr_count = list_length(cstate->attnumlist);
3287 nfields = file_has_oids ? (attr_count + 1) : attr_count;
3289 /* Initialize all values for row to NULL */
3290 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3291 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3293 if (!cstate->binary)
3295 char **field_strings;
3301 /* read raw fields in the next line */
3302 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3305 /* check for overflowing fields */
3306 if (nfields > 0 && fldct > nfields)
3308 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3309 errmsg("extra data after last expected column")));
3313 /* Read the OID field if present */
3316 if (fieldno >= fldct)
3318 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3319 errmsg("missing data for OID column")));
3320 string = field_strings[fieldno++];
3324 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3325 errmsg("null OID in COPY data")));
3326 else if (cstate->oids && tupleOid != NULL)
3328 cstate->cur_attname = "oid";
3329 cstate->cur_attval = string;
3330 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3331 CStringGetDatum(string)));
3332 if (*tupleOid == InvalidOid)
3334 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3335 errmsg("invalid OID in COPY data")));
3336 cstate->cur_attname = NULL;
3337 cstate->cur_attval = NULL;
3341 /* Loop to read the user attributes on the line. */
3342 foreach(cur, cstate->attnumlist)
3344 int attnum = lfirst_int(cur);
3347 if (fieldno >= fldct)
3349 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3350 errmsg("missing data for column \"%s\"",
3351 NameStr(attr[m]->attname))));
3352 string = field_strings[fieldno++];
3354 if (cstate->convert_select_flags &&
3355 !cstate->convert_select_flags[m])
3357 /* ignore input field, leaving column as NULL */
3361 if (cstate->csv_mode)
3363 if (string == NULL &&
3364 cstate->force_notnull_flags[m])
3367 * FORCE_NOT_NULL option is set and column is NULL -
3368 * convert it to the NULL string.
3370 string = cstate->null_print;
3372 else if (string != NULL && cstate->force_null_flags[m]
3373 && strcmp(string, cstate->null_print) == 0)
3376 * FORCE_NULL option is set and column matches the NULL
3377 * string. It must have been quoted, or otherwise the
3378 * string would already have been set to NULL. Convert it
3379 * to NULL as specified.
3385 cstate->cur_attname = NameStr(attr[m]->attname);
3386 cstate->cur_attval = string;
3387 values[m] = InputFunctionCall(&in_functions[m],
3390 attr[m]->atttypmod);
3393 cstate->cur_attname = NULL;
3394 cstate->cur_attval = NULL;
3397 Assert(fieldno == nfields);
3405 cstate->cur_lineno++;
3407 if (!CopyGetInt16(cstate, &fld_count))
3409 /* EOF detected (end of file, or protocol-level EOF) */
3413 if (fld_count == -1)
3416 * Received EOF marker. In a V3-protocol copy, wait for the
3417 * protocol-level EOF, and complain if it doesn't come
3418 * immediately. This ensures that we correctly handle CopyFail,
3419 * if client chooses to send that now.
3421 * Note that we MUST NOT try to read more data in an old-protocol
3422 * copy, since there is no protocol-level EOF marker then. We
3423 * could go either way for copy from file, but choose to throw
3424 * error if there's data after the EOF marker, for consistency
3425 * with the new-protocol case.
3429 if (cstate->copy_dest != COPY_OLD_FE &&
3430 CopyGetData(cstate, &dummy, 1, 1) > 0)
3432 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3433 errmsg("received copy data after EOF marker")));
3437 if (fld_count != attr_count)
3439 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3440 errmsg("row field count is %d, expected %d",
3441 (int) fld_count, attr_count)));
3447 cstate->cur_attname = "oid";
3449 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3451 &cstate->oid_in_function,
3452 cstate->oid_typioparam,
3455 if (isnull || loaded_oid == InvalidOid)
3457 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3458 errmsg("invalid OID in COPY data")));
3459 cstate->cur_attname = NULL;
3460 if (cstate->oids && tupleOid != NULL)
3461 *tupleOid = loaded_oid;
3465 foreach(cur, cstate->attnumlist)
3467 int attnum = lfirst_int(cur);
3470 cstate->cur_attname = NameStr(attr[m]->attname);
3472 values[m] = CopyReadBinaryAttribute(cstate,
3478 cstate->cur_attname = NULL;
3483 * Now compute and insert any defaults available for the columns not
3484 * provided by the input data. Anything not processed here or above will
3487 for (i = 0; i < num_defaults; i++)
3490 * The caller must supply econtext and have switched into the
3491 * per-tuple memory context in it.
3493 Assert(econtext != NULL);
3494 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3496 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3504 * Clean up storage and release resources for COPY FROM.
3507 EndCopyFrom(CopyState cstate)
3509 /* No COPY FROM related resources except memory. */
3515 * Read the next input line and stash it in line_buf, with conversion to
3518 * Result is true if read was terminated by EOF, false if terminated
3519 * by newline. The terminating newline or EOF marker is not included
3520 * in the final value of line_buf.
3523 CopyReadLine(CopyState cstate)
3527 resetStringInfo(&cstate->line_buf);
3528 cstate->line_buf_valid = true;
3530 /* Mark that encoding conversion hasn't occurred yet */
3531 cstate->line_buf_converted = false;
3533 /* Parse data and transfer into line_buf */
3534 result = CopyReadLineText(cstate);
3539 * Reached EOF. In protocol version 3, we should ignore anything
3540 * after \. up to the protocol end of copy data. (XXX maybe better
3541 * not to treat \. as special?)
3543 if (cstate->copy_dest == COPY_NEW_FE)
3547 cstate->raw_buf_index = cstate->raw_buf_len;
3548 } while (CopyLoadRawBuf(cstate));
3554 * If we didn't hit EOF, then we must have transferred the EOL marker
3555 * to line_buf along with the data. Get rid of it.
3557 switch (cstate->eol_type)
3560 Assert(cstate->line_buf.len >= 1);
3561 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3562 cstate->line_buf.len--;
3563 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3566 Assert(cstate->line_buf.len >= 1);
3567 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3568 cstate->line_buf.len--;
3569 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3572 Assert(cstate->line_buf.len >= 2);
3573 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3574 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3575 cstate->line_buf.len -= 2;
3576 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3579 /* shouldn't get here */
3585 /* Done reading the line. Convert it to server encoding. */
3586 if (cstate->need_transcoding)
3590 cvt = pg_any_to_server(cstate->line_buf.data,
3591 cstate->line_buf.len,
3592 cstate->file_encoding);
3593 if (cvt != cstate->line_buf.data)
3595 /* transfer converted data back to line_buf */
3596 resetStringInfo(&cstate->line_buf);
3597 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3602 /* Now it's safe to use the buffer in error messages */
3603 cstate->line_buf_converted = true;
3609 * CopyReadLineText - inner loop of CopyReadLine for text mode
3612 CopyReadLineText(CopyState cstate)
3617 bool need_data = false;
3618 bool hit_eof = false;
3619 bool result = false;
3623 bool first_char_in_line = true;
3624 bool in_quote = false,
3625 last_was_esc = false;
3627 char escapec = '\0';
3629 if (cstate->csv_mode)
3631 quotec = cstate->quote[0];
3632 escapec = cstate->escape[0];
3633 /* ignore special escape processing if it's the same as quotec */
3634 if (quotec == escapec)
3638 mblen_str[1] = '\0';
3641 * The objective of this loop is to transfer the entire next input line
3642 * into line_buf. Hence, we only care for detecting newlines (\r and/or
3643 * \n) and the end-of-copy marker (\.).
3645 * In CSV mode, \r and \n inside a quoted field are just part of the data
3646 * value and are put in line_buf. We keep just enough state to know if we
3647 * are currently in a quoted field or not.
3649 * These four characters, and the CSV escape and quote characters, are
3650 * assumed the same in frontend and backend encodings.
3652 * For speed, we try to move data from raw_buf to line_buf in chunks
3653 * rather than one character at a time. raw_buf_ptr points to the next
3654 * character to examine; any characters from raw_buf_index to raw_buf_ptr
3655 * have been determined to be part of the line, but not yet transferred to
3658 * For a little extra speed within the loop, we copy raw_buf and
3659 * raw_buf_len into local variables.
3661 copy_raw_buf = cstate->raw_buf;
3662 raw_buf_ptr = cstate->raw_buf_index;
3663 copy_buf_len = cstate->raw_buf_len;
3671 * Load more data if needed. Ideally we would just force four bytes
3672 * of read-ahead and avoid the many calls to
3673 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3674 * does not allow us to read too far ahead or we might read into the
3675 * next data, so we read-ahead only as far we know we can. One
3676 * optimization would be to read-ahead four byte here if
3677 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3678 * considering the size of the buffer.
3680 if (raw_buf_ptr >= copy_buf_len || need_data)
3685 * Try to read some more data. This will certainly reset
3686 * raw_buf_index to zero, and raw_buf_ptr must go with it.
3688 if (!CopyLoadRawBuf(cstate))
3691 copy_buf_len = cstate->raw_buf_len;
3694 * If we are completely out of data, break out of the loop,
3697 if (copy_buf_len <= 0)
3705 /* OK to fetch a character */
3706 prev_raw_ptr = raw_buf_ptr;
3707 c = copy_raw_buf[raw_buf_ptr++];
3709 if (cstate->csv_mode)
3712 * If character is '\\' or '\r', we may need to look ahead below.
3713 * Force fetch of the next character if we don't already have it.
3714 * We need to do this before changing CSV state, in case one of
3715 * these characters is also the quote or escape character.
3717 * Note: old-protocol does not like forced prefetch, but it's OK
3718 * here since we cannot validly be at EOF.
3720 if (c == '\\' || c == '\r')
3722 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3726 * Dealing with quotes and escapes here is mildly tricky. If the
3727 * quote char is also the escape char, there's no problem - we
3728 * just use the char as a toggle. If they are different, we need
3729 * to ensure that we only take account of an escape inside a
3730 * quoted field and immediately preceding a quote char, and not
3731 * the second in an escape-escape sequence.
3733 if (in_quote && c == escapec)
3734 last_was_esc = !last_was_esc;
3735 if (c == quotec && !last_was_esc)
3736 in_quote = !in_quote;
3738 last_was_esc = false;
3741 * Updating the line count for embedded CR and/or LF chars is
3742 * necessarily a little fragile - this test is probably about the
3743 * best we can do. (XXX it's arguable whether we should do this
3744 * at all --- is cur_lineno a physical or logical count?)
3746 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3747 cstate->cur_lineno++;
3751 if (c == '\r' && (!cstate->csv_mode || !in_quote))
3753 /* Check for \r\n on first line, _and_ handle \r\n. */
3754 if (cstate->eol_type == EOL_UNKNOWN ||
3755 cstate->eol_type == EOL_CRNL)
3758 * If need more data, go back to loop top to load it.
3760 * Note that if we are at EOF, c will wind up as '\0' because
3761 * of the guaranteed pad of raw_buf.
3763 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3766 c = copy_raw_buf[raw_buf_ptr];
3770 raw_buf_ptr++; /* eat newline */
3771 cstate->eol_type = EOL_CRNL; /* in case not set yet */
3775 /* found \r, but no \n */
3776 if (cstate->eol_type == EOL_CRNL)
3778 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3780 errmsg("literal carriage return found in data") :
3781 errmsg("unquoted carriage return found in data"),
3783 errhint("Use \"\\r\" to represent carriage return.") :
3784 errhint("Use quoted CSV field to represent carriage return.")));
3787 * if we got here, it is the first line and we didn't find
3788 * \n, so don't consume the peeked character
3790 cstate->eol_type = EOL_CR;
3793 else if (cstate->eol_type == EOL_NL)
3795 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3797 errmsg("literal carriage return found in data") :
3798 errmsg("unquoted carriage return found in data"),
3800 errhint("Use \"\\r\" to represent carriage return.") :
3801 errhint("Use quoted CSV field to represent carriage return.")));
3802 /* If reach here, we have found the line terminator */
3807 if (c == '\n' && (!cstate->csv_mode || !in_quote))
3809 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3811 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3813 errmsg("literal newline found in data") :
3814 errmsg("unquoted newline found in data"),
3816 errhint("Use \"\\n\" to represent newline.") :
3817 errhint("Use quoted CSV field to represent newline.")));
3818 cstate->eol_type = EOL_NL; /* in case not set yet */
3819 /* If reach here, we have found the line terminator */
3824 * In CSV mode, we only recognize \. alone on a line. This is because
3825 * \. is a valid CSV data value.
3827 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3831 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3832 IF_NEED_REFILL_AND_EOF_BREAK(0);
3835 * get next character
3836 * Note: we do not change c so if it isn't \., we can fall
3837 * through and continue processing for file encoding.
3840 c2 = copy_raw_buf[raw_buf_ptr];
3844 raw_buf_ptr++; /* consume the '.' */
3847 * Note: if we loop back for more data here, it does not
3848 * matter that the CSV state change checks are re-executed; we
3849 * will come back here with no important state changed.
3851 if (cstate->eol_type == EOL_CRNL)
3853 /* Get the next character */
3854 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3855 /* if hit_eof, c2 will become '\0' */
3856 c2 = copy_raw_buf[raw_buf_ptr++];
3860 if (!cstate->csv_mode)
3862 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3863 errmsg("end-of-copy marker does not match previous newline style")));
3865 NO_END_OF_COPY_GOTO;
3867 else if (c2 != '\r')
3869 if (!cstate->csv_mode)
3871 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3872 errmsg("end-of-copy marker corrupt")));
3874 NO_END_OF_COPY_GOTO;
3878 /* Get the next character */
3879 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3880 /* if hit_eof, c2 will become '\0' */
3881 c2 = copy_raw_buf[raw_buf_ptr++];
3883 if (c2 != '\r' && c2 != '\n')
3885 if (!cstate->csv_mode)
3887 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3888 errmsg("end-of-copy marker corrupt")));
3890 NO_END_OF_COPY_GOTO;
3893 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3894 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3895 (cstate->eol_type == EOL_CR && c2 != '\r'))
3898 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3899 errmsg("end-of-copy marker does not match previous newline style")));
3903 * Transfer only the data before the \. into line_buf, then
3904 * discard the data and the \. sequence.
3906 if (prev_raw_ptr > cstate->raw_buf_index)
3907 appendBinaryStringInfo(&cstate->line_buf,
3908 cstate->raw_buf + cstate->raw_buf_index,
3909 prev_raw_ptr - cstate->raw_buf_index);
3910 cstate->raw_buf_index = raw_buf_ptr;
3911 result = true; /* report EOF */
3914 else if (!cstate->csv_mode)
3917 * If we are here, it means we found a backslash followed by
3918 * something other than a period. In non-CSV mode, anything
3919 * after a backslash is special, so we skip over that second
3920 * character too. If we didn't do that \\. would be
3921 * considered an eof-of copy, while in non-CSV mode it is a
3922 * literal backslash followed by a period. In CSV mode,
3923 * backslashes are not special, so we want to process the
3924 * character after the backslash just like a normal character,
3925 * so we don't increment in those cases.
3931 * This label is for CSV cases where \. appears at the start of a
3932 * line, but there is more text after it, meaning it was a data value.
3933 * We are more strict for \. in CSV mode because \. could be a data
3934 * value, while in non-CSV mode, \. cannot be a data value.
3939 * Process all bytes of a multi-byte character as a group.
3941 * We only support multi-byte sequences where the first byte has the
3942 * high-bit set, so as an optimization we can avoid this block
3943 * entirely if it is not set.
3945 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3950 /* All our encodings only read the first byte to get the length */
3951 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3952 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3953 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3954 raw_buf_ptr += mblen - 1;
3956 first_char_in_line = false;
3957 } /* end of outer loop */
3960 * Transfer any still-uncopied data to line_buf.
3968 * Return decimal value for a hexadecimal digit
3971 GetDecimalFromHex(char hex)
3973 if (isdigit((unsigned char) hex))
3976 return tolower((unsigned char) hex) - 'a' + 10;
3980 * Parse the current line into separate attributes (fields),
3981 * performing de-escaping as needed.
3983 * The input is in line_buf. We use attribute_buf to hold the result
3984 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3985 * string, or NULL when the input matches the null marker string.
3986 * This array is expanded as necessary.
3988 * (Note that the caller cannot check for nulls since the returned
3989 * string would be the post-de-escaping equivalent, which may look
3990 * the same as some valid data string.)
3992 * delim is the column delimiter string (must be just one byte for now).
3993 * null_print is the null marker string. Note that this is compared to
3994 * the pre-de-escaped input string.
3996 * The return value is the number of fields actually read.
3999 CopyReadAttributesText(CopyState cstate)
4001 char delimc = cstate->delim[0];
4008 * We need a special case for zero-column tables: check that the input
4009 * line is empty, and return.
4011 if (cstate->max_fields <= 0)
4013 if (cstate->line_buf.len != 0)
4015 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4016 errmsg("extra data after last expected column")));
4020 resetStringInfo(&cstate->attribute_buf);
4023 * The de-escaped attributes will certainly not be longer than the input
4024 * data line, so we can just force attribute_buf to be large enough and
4025 * then transfer data without any checks for enough space. We need to do
4026 * it this way because enlarging attribute_buf mid-stream would invalidate
4027 * pointers already stored into cstate->raw_fields[].
4029 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4030 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4031 output_ptr = cstate->attribute_buf.data;
4033 /* set pointer variables for loop */
4034 cur_ptr = cstate->line_buf.data;
4035 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4037 /* Outer loop iterates over fields */
4041 bool found_delim = false;
4045 bool saw_non_ascii = false;
4047 /* Make sure there is enough space for the next value */
4048 if (fieldno >= cstate->max_fields)
4050 cstate->max_fields *= 2;
4051 cstate->raw_fields =
4052 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4055 /* Remember start of field on both input and output sides */
4056 start_ptr = cur_ptr;
4057 cstate->raw_fields[fieldno] = output_ptr;
4060 * Scan data for field.
4062 * Note that in this loop, we are scanning to locate the end of field
4063 * and also speculatively performing de-escaping. Once we find the
4064 * end-of-field, we can match the raw field contents against the null
4065 * marker string. Only after that comparison fails do we know that
4066 * de-escaping is actually the right thing to do; therefore we *must
4067 * not* throw any syntax errors before we've done the null-marker
4075 if (cur_ptr >= line_end_ptr)
4085 if (cur_ptr >= line_end_ptr)
4103 if (cur_ptr < line_end_ptr)
4109 val = (val << 3) + OCTVALUE(c);
4110 if (cur_ptr < line_end_ptr)
4116 val = (val << 3) + OCTVALUE(c);
4122 if (c == '\0' || IS_HIGHBIT_SET(c))
4123 saw_non_ascii = true;
4128 if (cur_ptr < line_end_ptr)
4130 char hexchar = *cur_ptr;
4132 if (isxdigit((unsigned char) hexchar))
4134 int val = GetDecimalFromHex(hexchar);
4137 if (cur_ptr < line_end_ptr)
4140 if (isxdigit((unsigned char) hexchar))
4143 val = (val << 4) + GetDecimalFromHex(hexchar);
4147 if (c == '\0' || IS_HIGHBIT_SET(c))
4148 saw_non_ascii = true;
4172 * in all other cases, take the char after '\'
4178 /* Add c to output string */
4182 /* Check whether raw input matched null marker */
4183 input_len = end_ptr - start_ptr;
4184 if (input_len == cstate->null_print_len &&
4185 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4186 cstate->raw_fields[fieldno] = NULL;
4190 * At this point we know the field is supposed to contain data.
4192 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4193 * resulting string is valid data for the db encoding.
4197 char *fld = cstate->raw_fields[fieldno];
4199 pg_verifymbstr(fld, output_ptr - fld, false);
4203 /* Terminate attribute value in output area */
4204 *output_ptr++ = '\0';
4207 /* Done if we hit EOL instead of a delim */
4212 /* Clean up state of attribute_buf */
4214 Assert(*output_ptr == '\0');
4215 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4221 * Parse the current line into separate attributes (fields),
4222 * performing de-escaping as needed. This has exactly the same API as
4223 * CopyReadAttributesText, except we parse the fields according to
4224 * "standard" (i.e. common) CSV usage.
4227 CopyReadAttributesCSV(CopyState cstate)
4229 char delimc = cstate->delim[0];
4230 char quotec = cstate->quote[0];
4231 char escapec = cstate->escape[0];
4238 * We need a special case for zero-column tables: check that the input
4239 * line is empty, and return.
4241 if (cstate->max_fields <= 0)
4243 if (cstate->line_buf.len != 0)
4245 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4246 errmsg("extra data after last expected column")));
4250 resetStringInfo(&cstate->attribute_buf);
4253 * The de-escaped attributes will certainly not be longer than the input
4254 * data line, so we can just force attribute_buf to be large enough and
4255 * then transfer data without any checks for enough space. We need to do
4256 * it this way because enlarging attribute_buf mid-stream would invalidate
4257 * pointers already stored into cstate->raw_fields[].
4259 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4260 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4261 output_ptr = cstate->attribute_buf.data;
4263 /* set pointer variables for loop */
4264 cur_ptr = cstate->line_buf.data;
4265 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4267 /* Outer loop iterates over fields */
4271 bool found_delim = false;
4272 bool saw_quote = false;
4277 /* Make sure there is enough space for the next value */
4278 if (fieldno >= cstate->max_fields)
4280 cstate->max_fields *= 2;
4281 cstate->raw_fields =
4282 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4285 /* Remember start of field on both input and output sides */
4286 start_ptr = cur_ptr;
4287 cstate->raw_fields[fieldno] = output_ptr;
4290 * Scan data for field,
4292 * The loop starts in "not quote" mode and then toggles between that
4293 * and "in quote" mode. The loop exits normally if it is in "not
4294 * quote" mode and a delimiter or line end is seen.
4304 if (cur_ptr >= line_end_ptr)
4307 /* unquoted field delimiter */
4313 /* start of quoted field (or part of field) */
4319 /* Add c to output string */
4327 if (cur_ptr >= line_end_ptr)
4329 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4330 errmsg("unterminated CSV quoted field")));
4334 /* escape within a quoted field */
4338 * peek at the next char if available, and escape it if it
4339 * is an escape char or a quote char
4341 if (cur_ptr < line_end_ptr)
4343 char nextc = *cur_ptr;
4345 if (nextc == escapec || nextc == quotec)
4347 *output_ptr++ = nextc;
4355 * end of quoted field. Must do this test after testing for
4356 * escape in case quote char and escape char are the same
4357 * (which is the common case).
4362 /* Add c to output string */
4368 /* Terminate attribute value in output area */
4369 *output_ptr++ = '\0';
4371 /* Check whether raw input matched null marker */
4372 input_len = end_ptr - start_ptr;
4373 if (!saw_quote && input_len == cstate->null_print_len &&
4374 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4375 cstate->raw_fields[fieldno] = NULL;
4378 /* Done if we hit EOL instead of a delim */
4383 /* Clean up state of attribute_buf */
4385 Assert(*output_ptr == '\0');
4386 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4393 * Read a binary attribute
4396 CopyReadBinaryAttribute(CopyState cstate,
4397 int column_no, FmgrInfo *flinfo,
4398 Oid typioparam, int32 typmod,
4404 if (!CopyGetInt32(cstate, &fld_size))
4406 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4407 errmsg("unexpected EOF in COPY data")));
4411 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4415 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4416 errmsg("invalid field size")));
4418 /* reset attribute_buf to empty, and load raw data in it */
4419 resetStringInfo(&cstate->attribute_buf);
4421 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4422 if (CopyGetData(cstate, cstate->attribute_buf.data,
4423 fld_size, fld_size) != fld_size)
4425 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4426 errmsg("unexpected EOF in COPY data")));
4428 cstate->attribute_buf.len = fld_size;
4429 cstate->attribute_buf.data[fld_size] = '\0';
4431 /* Call the column type's binary input converter */
4432 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4433 typioparam, typmod);
4435 /* Trouble if it didn't eat the whole buffer */
4436 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4438 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4439 errmsg("incorrect binary data format")));
4446 * Send text representation of one attribute, with conversion and escaping
4448 #define DUMPSOFAR() \
4451 CopySendData(cstate, start, ptr - start); \
4455 CopyAttributeOutText(CopyState cstate, char *string)
4460 char delimc = cstate->delim[0];
4462 if (cstate->need_transcoding)
4463 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4468 * We have to grovel through the string searching for control characters
4469 * and instances of the delimiter character. In most cases, though, these
4470 * are infrequent. To avoid overhead from calling CopySendData once per
4471 * character, we dump out all characters between escaped characters in a
4472 * single call. The loop invariant is that the data from "start" to "ptr"
4473 * can be sent literally, but hasn't yet been.
4475 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4476 * in valid backend encodings, extra bytes of a multibyte character never
4477 * look like ASCII. This loop is sufficiently performance-critical that
4478 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4479 * of the normal safe-encoding path.
4481 if (cstate->encoding_embeds_ascii)
4484 while ((c = *ptr) != '\0')
4486 if ((unsigned char) c < (unsigned char) 0x20)
4489 * \r and \n must be escaped, the others are traditional. We
4490 * prefer to dump these using the C-like notation, rather than
4491 * a backslash and the literal character, because it makes the
4492 * dump file a bit more proof against Microsoftish data
4516 /* If it's the delimiter, must backslash it */
4519 /* All ASCII control chars are length 1 */
4521 continue; /* fall to end of loop */
4523 /* if we get here, we need to convert the control char */
4525 CopySendChar(cstate, '\\');
4526 CopySendChar(cstate, c);
4527 start = ++ptr; /* do not include char in next run */
4529 else if (c == '\\' || c == delimc)
4532 CopySendChar(cstate, '\\');
4533 start = ptr++; /* we include char in next run */
4535 else if (IS_HIGHBIT_SET(c))
4536 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4544 while ((c = *ptr) != '\0')
4546 if ((unsigned char) c < (unsigned char) 0x20)
4549 * \r and \n must be escaped, the others are traditional. We
4550 * prefer to dump these using the C-like notation, rather than
4551 * a backslash and the literal character, because it makes the
4552 * dump file a bit more proof against Microsoftish data
4576 /* If it's the delimiter, must backslash it */
4579 /* All ASCII control chars are length 1 */
4581 continue; /* fall to end of loop */
4583 /* if we get here, we need to convert the control char */
4585 CopySendChar(cstate, '\\');
4586 CopySendChar(cstate, c);
4587 start = ++ptr; /* do not include char in next run */
4589 else if (c == '\\' || c == delimc)
4592 CopySendChar(cstate, '\\');
4593 start = ptr++; /* we include char in next run */
4604 * Send text representation of one attribute, with conversion and
4605 * CSV-style escaping
4608 CopyAttributeOutCSV(CopyState cstate, char *string,
4609 bool use_quote, bool single_attr)
4614 char delimc = cstate->delim[0];
4615 char quotec = cstate->quote[0];
4616 char escapec = cstate->escape[0];
4618 /* force quoting if it matches null_print (before conversion!) */
4619 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4622 if (cstate->need_transcoding)
4623 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4628 * Make a preliminary pass to discover if it needs quoting
4633 * Because '\.' can be a data value, quote it if it appears alone on a
4634 * line so it is not interpreted as the end-of-data marker.
4636 if (single_attr && strcmp(ptr, "\\.") == 0)
4642 while ((c = *tptr) != '\0')
4644 if (c == delimc || c == quotec || c == '\n' || c == '\r')
4649 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4650 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4659 CopySendChar(cstate, quotec);
4662 * We adopt the same optimization strategy as in CopyAttributeOutText
4665 while ((c = *ptr) != '\0')
4667 if (c == quotec || c == escapec)
4670 CopySendChar(cstate, escapec);
4671 start = ptr; /* we include char in next run */
4673 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4674 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4680 CopySendChar(cstate, quotec);
4684 /* If it doesn't need quoting, we can just dump it as-is */
4685 CopySendString(cstate, ptr);
4690 * CopyGetAttnums - build an integer list of attnums to be copied
4692 * The input attnamelist is either the user-specified column list,
4693 * or NIL if there was none (in which case we want all the non-dropped
4696 * rel can be NULL ... it's only used for error reports.
4699 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4701 List *attnums = NIL;
4703 if (attnamelist == NIL)
4705 /* Generate default column list */
4706 Form_pg_attribute *attr = tupDesc->attrs;
4707 int attr_count = tupDesc->natts;
4710 for (i = 0; i < attr_count; i++)
4712 if (attr[i]->attisdropped)
4714 attnums = lappend_int(attnums, i + 1);
4719 /* Validate the user-supplied list and extract attnums */
4722 foreach(l, attnamelist)
4724 char *name = strVal(lfirst(l));
4728 /* Lookup column name */
4729 attnum = InvalidAttrNumber;
4730 for (i = 0; i < tupDesc->natts; i++)
4732 if (tupDesc->attrs[i]->attisdropped)
4734 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4736 attnum = tupDesc->attrs[i]->attnum;
4740 if (attnum == InvalidAttrNumber)
4744 (errcode(ERRCODE_UNDEFINED_COLUMN),
4745 errmsg("column \"%s\" of relation \"%s\" does not exist",
4746 name, RelationGetRelationName(rel))));
4749 (errcode(ERRCODE_UNDEFINED_COLUMN),
4750 errmsg("column \"%s\" does not exist",
4753 /* Check for duplicates */
4754 if (list_member_int(attnums, attnum))
4756 (errcode(ERRCODE_DUPLICATE_COLUMN),
4757 errmsg("column \"%s\" specified more than once",
4759 attnums = lappend_int(attnums, attnum);
4768 * copy_dest_startup --- executor startup
4771 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4777 * copy_dest_receive --- receive one tuple
4780 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4782 DR_copy *myState = (DR_copy *) self;
4783 CopyState cstate = myState->cstate;
4785 /* Make sure the tuple is fully deconstructed */
4786 slot_getallattrs(slot);
4788 /* And send the data */
4789 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4790 myState->processed++;
4796 * copy_dest_shutdown --- executor end
4799 copy_dest_shutdown(DestReceiver *self)
4805 * copy_dest_destroy --- release DestReceiver object
4808 copy_dest_destroy(DestReceiver *self)
4814 * CreateCopyDestReceiver -- create a suitable DestReceiver object
4817 CreateCopyDestReceiver(void)
4819 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4821 self->pub.receiveSlot = copy_dest_receive;
4822 self->pub.rStartup = copy_dest_startup;
4823 self->pub.rShutdown = copy_dest_shutdown;
4824 self->pub.rDestroy = copy_dest_destroy;
4825 self->pub.mydest = DestCopyOut;
4827 self->cstate = NULL; /* will be set later */
4828 self->processed = 0;
4830 return (DestReceiver *) self;