1 /*-------------------------------------------------------------------------
5 * Copyright (c) 1994, Regents of the University of California
9 * $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.49 1998/07/15 18:53:40 momjian Exp $
11 *-------------------------------------------------------------------------
19 #include <access/heapam.h>
20 #include <tcop/dest.h>
22 #include <miscadmin.h>
23 #include <utils/builtins.h>
24 #include <utils/acl.h>
26 #include <catalog/pg_index.h>
27 #include <utils/syscache.h>
28 #include <utils/memutils.h>
29 #include <executor/executor.h>
30 #include <access/transam.h>
31 #include <catalog/index.h>
32 #include <access/genam.h>
33 #include <catalog/pg_type.h>
34 #include <catalog/catname.h>
35 #include <catalog/pg_shadow.h>
36 #include <commands/copy.h>
37 #include "commands/trigger.h"
38 #include <storage/fd.h>
40 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
41 #define VALUE(c) ((c) - '0')
44 /* non-export function prototypes */
45 static void CopyTo(Relation rel, bool binary, bool oids, FILE *fp, char *delim);
46 static void CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, char *delim);
47 static Oid GetOutputFunction(Oid type);
48 static Oid GetTypeElement(Oid type);
49 static Oid GetInputFunction(Oid type);
50 static Oid IsTypeByVal(Oid type);
52 GetIndexRelations(Oid main_relation_oid,
54 Relation **index_rels);
57 static void CopyReadNewline(FILE *fp, int *newline);
58 static char *CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline);
61 static char *CopyReadAttribute(FILE *fp, bool *isnull, char *delim);
64 static void CopyAttributeOut(FILE *fp, char *string, char *delim, int is_array);
65 static int CountTuples(Relation relation);
73 * DoCopy executes a the SQL COPY statement.
77 DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe,
78 char *filename, char *delim)
80 /*----------------------------------------------------------------------------
81 Either unload or reload contents of class <relname>, depending on <from>.
83 If <pipe> is false, transfer is between the class and the file named
84 <filename>. Otherwise, transfer is between the class and our regular
85 input/output stream. The latter could be either stdin/stdout or a
86 socket, depending on whether we're running under Postmaster control.
88 Iff <binary>, unload or reload in the binary format, as opposed to the
89 more wasteful but more robust and portable text format.
91 If in the text format, delimit columns with delimiter <delim>.
93 When loading in the text format from an input stream (as opposed to
94 a file), recognize a "." on a line by itself as EOF. Also recognize
95 a stream EOF. When unloading in the text format to an output stream,
96 write a "." on a line by itself at the end of the data.
98 Iff <oids>, unload or reload the format that includes OID information.
100 Do not allow a Postgres user without superuser privilege to read from
103 Do not allow the copy if user doesn't have proper permission to access
105 ----------------------------------------------------------------------------*/
109 extern char *UserName; /* defined in global.c */
110 const AclMode required_access = from ? ACL_WR : ACL_RD;
113 rel = heap_openr(relname);
115 elog(ERROR, "COPY command failed. Class %s "
116 "does not exist.", relname);
118 result = pg_aclcheck(relname, UserName, required_access);
119 if (result != ACLCHECK_OK)
120 elog(ERROR, "%s: %s", relname, aclcheck_error_strings[result]);
121 /* Above should not return */
122 else if (!superuser() && !pipe)
123 elog(ERROR, "You must have Postgres superuser privilege to do a COPY "
124 "directly to or from a file. Anyone can COPY to stdout or "
125 "from stdin. Psql's \\copy command also works for anyone.");
126 /* Above should not return. */
130 { /* copy from file to database */
131 if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
132 elog(ERROR, "You can't change sequence relation %s", relname);
135 if (IsUnderPostmaster)
145 fp = AllocateFile(filename, "r");
147 elog(ERROR, "COPY command, running in backend with "
148 "effective uid %d, could not open file '%s' for "
149 "reading. Errno = %s (%d).",
150 geteuid(), filename, strerror(errno), errno);
151 /* Above should not return */
153 CopyFrom(rel, binary, oids, fp, delim);
156 { /* copy from database to file */
159 if (IsUnderPostmaster)
169 mode_t oumask; /* Pre-existing umask value */
171 oumask = umask((mode_t) 0);
172 fp = AllocateFile(filename, "w");
175 elog(ERROR, "COPY command, running in backend with "
176 "effective uid %d, could not open file '%s' for "
177 "writing. Errno = %s (%d).",
178 geteuid(), filename, strerror(errno), errno);
179 /* Above should not return */
181 CopyTo(rel, binary, oids, fp, delim);
185 else if (!from && !binary)
188 if (IsUnderPostmaster)
197 CopyTo(Relation rel, bool binary, bool oids, FILE *fp, char *delim)
200 HeapScanDesc scandesc;
204 AttributeTupleForm *attr;
205 FmgrInfo *out_functions;
210 bool isnull; /* The attribute we are copying is null */
214 * <nulls> is a (dynamically allocated) array with one character per
215 * attribute in the instance being copied. nulls[I-1] is 'n' if
216 * Attribute Number I is null, and ' ' otherwise.
218 * <nulls> is meaningful only if we are doing a binary copy.
224 scandesc = heap_beginscan(rel, 0, false, 0, NULL);
226 attr_count = rel->rd_att->natts;
227 attr = rel->rd_att->attrs;
228 tupDesc = rel->rd_att;
232 out_functions = (FmgrInfo *) palloc(attr_count * sizeof(FmgrInfo));
233 elements = (Oid *) palloc(attr_count * sizeof(Oid));
234 typmod = (int32 *) palloc(attr_count * sizeof(int32));
235 for (i = 0; i < attr_count; i++)
237 out_func_oid = (Oid) GetOutputFunction(attr[i]->atttypid);
238 fmgr_info(out_func_oid, &out_functions[i]);
239 elements[i] = GetTypeElement(attr[i]->atttypid);
240 typmod[i] = attr[i]->atttypmod;
242 nulls = NULL; /* meaningless, but compiler doesn't know
249 out_functions = NULL;
250 nulls = (char *) palloc(attr_count);
251 for (i = 0; i < attr_count; i++)
256 ntuples = CountTuples(rel);
257 fwrite(&ntuples, sizeof(int32), 1, fp);
260 for (tuple = heap_getnext(scandesc, 0, NULL);
262 tuple = heap_getnext(scandesc, 0, NULL))
267 fputs(oidout(tuple->t_oid), fp);
271 for (i = 0; i < attr_count; i++)
273 value = heap_getattr(tuple, i + 1, tupDesc, &isnull);
278 string = (char *) (*fmgr_faddr(&out_functions[i]))
279 (value, elements[i], typmod[i]);
280 CopyAttributeOut(fp, string, delim, attr[i]->attnelems);
284 fputs("\\N", fp); /* null indicator */
286 if (i == attr_count - 1)
292 * when copying out, only use the first char of the
302 * only interesting thing heap_getattr tells us in this
303 * case is if we have a null attribute or not.
315 for (i = 0; i < attr_count; i++)
321 length = tuple->t_len - tuple->t_hoff;
322 fwrite(&length, sizeof(int32), 1, fp);
324 fwrite((char *) &tuple->t_oid, sizeof(int32), 1, fp);
326 fwrite(&null_ct, sizeof(int32), 1, fp);
329 for (i = 0; i < attr_count; i++)
333 fwrite(&i, sizeof(int32), 1, fp);
338 fwrite((char *) tuple + tuple->t_hoff, length, 1, fp);
342 heap_endscan(scandesc);
347 pfree(out_functions);
356 CopyFrom(Relation rel, bool binary, bool oids, FILE *fp, char *delim)
359 AttrNumber attr_count;
360 AttributeTupleForm *attr;
361 FmgrInfo *in_functions;
373 Relation *index_rels;
379 bool reading_to_eof = true;
382 FuncIndexInfo *finfo,
384 TupleDesc *itupdescArr;
385 HeapTuple pgIndexTup;
386 IndexTupleForm *pgIndexP = NULL;
387 int *indexNatts = NULL;
389 Node **indexPred = NULL;
391 ExprContext *econtext = NULL;
393 #ifndef OMIT_PARTIAL_INDEX
394 TupleTable tupleTable;
395 TupleTableSlot *slot = NULL;
402 InsertIndexResult indexRes;
405 bool skip_tuple = false;
407 tupDesc = RelationGetTupleDescriptor(rel);
408 attr = tupDesc->attrs;
409 attr_count = tupDesc->natts;
414 * This may be a scalar or a functional index. We initialize all
415 * kinds of arrays here to avoid doing extra work at every tuple copy.
418 if (rel->rd_rel->relhasindex)
420 GetIndexRelations(rel->rd_id, &n_indices, &index_rels);
425 (TupleDesc *) palloc(n_indices * sizeof(TupleDesc));
427 (IndexTupleForm *) palloc(n_indices * sizeof(IndexTupleForm));
428 indexNatts = (int *) palloc(n_indices * sizeof(int));
429 finfo = (FuncIndexInfo *) palloc(n_indices * sizeof(FuncIndexInfo));
430 finfoP = (FuncIndexInfo **) palloc(n_indices * sizeof(FuncIndexInfo *));
431 indexPred = (Node **) palloc(n_indices * sizeof(Node *));
433 for (i = 0; i < n_indices; i++)
435 itupdescArr[i] = RelationGetTupleDescriptor(index_rels[i]);
437 SearchSysCacheTuple(INDEXRELID,
438 ObjectIdGetDatum(index_rels[i]->rd_id),
441 pgIndexP[i] = (IndexTupleForm) GETSTRUCT(pgIndexTup);
442 for (attnumP = &(pgIndexP[i]->indkey[0]), natts = 0;
443 *attnumP != InvalidAttrNumber;
445 if (pgIndexP[i]->indproc != InvalidOid)
447 FIgetnArgs(&finfo[i]) = natts;
449 FIgetProcOid(&finfo[i]) = pgIndexP[i]->indproc;
450 *(FIgetname(&finfo[i])) = '\0';
451 finfoP[i] = &finfo[i];
454 finfoP[i] = (FuncIndexInfo *) NULL;
455 indexNatts[i] = natts;
456 if (VARSIZE(&pgIndexP[i]->indpred) != 0)
458 predString = fmgr(F_TEXTOUT, &pgIndexP[i]->indpred);
459 indexPred[i] = stringToNode(predString);
461 /* make dummy ExprContext for use by ExecQual */
462 if (econtext == NULL)
464 #ifndef OMIT_PARTIAL_INDEX
465 tupleTable = ExecCreateTupleTable(1);
466 slot = ExecAllocTableSlot(tupleTable);
467 econtext = makeNode(ExprContext);
468 econtext->ecxt_scantuple = slot;
469 rtupdesc = RelationGetTupleDescriptor(rel);
470 slot->ttc_tupleDescriptor = rtupdesc;
473 * There's no buffer associated with heap tuples
474 * here, so I set the slot's buffer to NULL.
475 * Currently, it appears that the only way a
476 * buffer could be needed would be if the partial
477 * index predicate referred to the "lock" system
478 * attribute. If it did, then heap_getattr would
479 * call HeapTupleGetRuleLock, which uses the
480 * buffer's descriptor to get the relation id.
481 * Rather than try to fix this, I'll just disallow
482 * partial indexes on "lock", which wouldn't be
483 * useful anyway. --Nels, Nov '92
485 /* SetSlotBuffer(slot, (Buffer) NULL); */
486 /* SetSlotShouldFree(slot, false); */
487 slot->ttc_buffer = (Buffer) NULL;
488 slot->ttc_shouldFree = false;
489 #endif /* OMIT_PARTIAL_INDEX */
500 in_functions = (FmgrInfo *) palloc(attr_count * sizeof(FmgrInfo));
501 elements = (Oid *) palloc(attr_count * sizeof(Oid));
502 typmod = (int32 *) palloc(attr_count * sizeof(int32));
503 for (i = 0; i < attr_count; i++)
505 in_func_oid = (Oid) GetInputFunction(attr[i]->atttypid);
506 fmgr_info(in_func_oid, &in_functions[i]);
507 elements[i] = GetTypeElement(attr[i]->atttypid);
508 typmod[i] = attr[i]->atttypmod;
516 fread(&ntuples, sizeof(int32), 1, fp);
518 reading_to_eof = false;
521 values = (Datum *) palloc(sizeof(Datum) * attr_count);
522 nulls = (char *) palloc(attr_count);
523 index_nulls = (char *) palloc(attr_count);
524 idatum = (Datum *) palloc(sizeof(Datum) * attr_count);
525 byval = (bool *) palloc(attr_count * sizeof(bool));
527 for (i = 0; i < attr_count; i++)
530 index_nulls[i] = ' ';
531 byval[i] = (bool) IsTypeByVal(attr[i]->atttypid);
547 string = CopyReadAttribute(fp, &isnull, delim, &newline);
549 string = CopyReadAttribute(fp, &isnull, delim);
555 loaded_oid = oidin(string);
556 if (loaded_oid < BootstrapObjectIdData)
557 elog(ERROR, "COPY TEXT: Invalid Oid. line: %d", lineno);
560 for (i = 0; i < attr_count && !done; i++)
563 string = CopyReadAttribute(fp, &isnull, delim, &newline);
565 string = CopyReadAttribute(fp, &isnull, delim);
569 values[i] = PointerGetDatum(NULL);
572 else if (string == NULL)
577 (Datum) (*fmgr_faddr(&in_functions[i])) (string,
582 * Sanity check - by reference attributes cannot
585 if (!PointerIsValid(values[i]) &&
586 !(rel->rd_att->attrs[i]->attbyval))
587 elog(ERROR, "copy from line %d: Bad file format", lineno);
592 CopyReadNewline(fp, &newline);
597 fread(&len, sizeof(int32), 1, fp);
604 fread(&loaded_oid, sizeof(int32), 1, fp);
605 if (loaded_oid < BootstrapObjectIdData)
606 elog(ERROR, "COPY BINARY: Invalid Oid line: %d", lineno);
608 fread(&null_ct, sizeof(int32), 1, fp);
611 for (i = 0; i < null_ct; i++)
613 fread(&null_id, sizeof(int32), 1, fp);
614 nulls[null_id] = 'n';
618 string = (char *) palloc(len);
619 fread(string, len, 1, fp);
623 for (i = 0; i < attr_count; i++)
625 if (byval[i] && nulls[i] != 'n')
628 switch (attr[i]->attlen)
631 values[i] = (Datum) *(unsigned char *) ptr;
635 ptr = (char *) SHORTALIGN(ptr);
636 values[i] = (Datum) *(unsigned short *) ptr;
637 ptr += sizeof(short);
640 ptr = (char *) INTALIGN(ptr);
641 values[i] = (Datum) *(uint32 *) ptr;
642 ptr += sizeof(int32);
645 elog(ERROR, "COPY BINARY: impossible size! line: %d", lineno);
649 else if (nulls[i] != 'n')
651 switch (attr[i]->attlen)
654 if (attr[i]->attalign == 'd')
655 ptr = (char *) DOUBLEALIGN(ptr);
657 ptr = (char *) INTALIGN(ptr);
658 values[i] = (Datum) ptr;
659 ptr += *(uint32 *) ptr;
662 values[i] = (Datum) ptr;
663 ptr += attr[i]->attlen;
666 ptr = (char *) SHORTALIGN(ptr);
667 values[i] = (Datum) ptr;
668 ptr += attr[i]->attlen;
671 ptr = (char *) INTALIGN(ptr);
672 values[i] = (Datum) ptr;
673 ptr += attr[i]->attlen;
676 if (attr[i]->attalign == 'd')
677 ptr = (char *) DOUBLEALIGN(ptr);
679 ptr = (char *) LONGALIGN(ptr);
680 values[i] = (Datum) ptr;
681 ptr += attr[i]->attlen;
691 * Does it have any sence ? - vadim 12/14/96
693 * tupDesc = CreateTupleDesc(attr_count, attr);
695 tuple = heap_formtuple(tupDesc, values, nulls);
697 tuple->t_oid = loaded_oid;
700 /* BEFORE ROW INSERT Triggers */
702 rel->trigdesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
706 newtuple = ExecBRInsertTriggers(rel, tuple);
708 if (newtuple == NULL) /* "do nothing" */
710 else if (newtuple != tuple) /* modified by Trigger(s) */
720 * Check the constraints of a tuple
724 if (rel->rd_att->constr)
728 newtuple = ExecConstraints("CopyFrom", rel, tuple);
730 if (newtuple != tuple)
737 heap_insert(rel, tuple);
741 for (i = 0; i < n_indices; i++)
743 if (indexPred[i] != NULL)
745 #ifndef OMIT_PARTIAL_INDEX
748 * if tuple doesn't satisfy predicate, don't
752 /* SetSlotContents(slot, tuple); */
753 if (ExecQual((List *) indexPred[i], econtext) == false)
755 #endif /* OMIT_PARTIAL_INDEX */
757 FormIndexDatum(indexNatts[i],
758 (AttrNumber *) &(pgIndexP[i]->indkey[0]),
765 indexRes = index_insert(index_rels[i], idatum, index_nulls,
766 &(tuple->t_ctid), rel);
771 /* AFTER ROW INSERT Triggers */
773 rel->trigdesc->n_after_row[TRIGGER_EVENT_INSERT] > 0)
774 ExecARInsertTriggers(rel, tuple);
780 for (i = 0; i < attr_count; i++)
782 if (!byval[i] && nulls[i] != 'n')
785 pfree((void *) values[i]);
787 else if (nulls[i] == 'n')
794 if (!reading_to_eof && ntuples == tuples_read)
812 GetOutputFunction(Oid type)
816 typeTuple = SearchSysCacheTuple(TYPOID,
817 ObjectIdGetDatum(type),
820 if (HeapTupleIsValid(typeTuple))
821 return ((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typoutput);
823 elog(ERROR, "GetOutputFunction: Cache lookup of type %d failed", type);
828 GetTypeElement(Oid type)
832 typeTuple = SearchSysCacheTuple(TYPOID,
833 ObjectIdGetDatum(type),
837 if (HeapTupleIsValid(typeTuple))
838 return ((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typelem);
840 elog(ERROR, "GetOutputFunction: Cache lookup of type %d failed", type);
845 GetInputFunction(Oid type)
849 typeTuple = SearchSysCacheTuple(TYPOID,
850 ObjectIdGetDatum(type),
853 if (HeapTupleIsValid(typeTuple))
854 return ((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typinput);
856 elog(ERROR, "GetInputFunction: Cache lookup of type %d failed", type);
861 IsTypeByVal(Oid type)
865 typeTuple = SearchSysCacheTuple(TYPOID,
866 ObjectIdGetDatum(type),
869 if (HeapTupleIsValid(typeTuple))
870 return ((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typbyval);
872 elog(ERROR, "GetInputFunction: Cache lookup of type %d failed", type);
878 * Given the OID of a relation, return an array of index relation descriptors
879 * and the number of index relations. These relation descriptors are open
882 * Space for the array itself is palloc'ed.
885 typedef struct rel_list
888 struct rel_list *next;
892 GetIndexRelations(Oid main_relation_oid,
894 Relation **index_rels)
898 Relation pg_index_rel;
899 HeapScanDesc scandesc;
900 Oid index_relation_oid;
906 pg_index_rel = heap_openr(IndexRelationName);
907 scandesc = heap_beginscan(pg_index_rel, 0, false, 0, NULL);
908 tupDesc = RelationGetTupleDescriptor(pg_index_rel);
912 head = (RelationList *) palloc(sizeof(RelationList));
916 for (tuple = heap_getnext(scandesc, 0, NULL);
918 tuple = heap_getnext(scandesc, 0, NULL))
922 (Oid) DatumGetInt32(heap_getattr(tuple, 2,
924 if (index_relation_oid == main_relation_oid)
926 scan->index_rel_oid =
927 (Oid) DatumGetInt32(heap_getattr(tuple,
928 Anum_pg_index_indexrelid,
931 scan->next = (RelationList *) palloc(sizeof(RelationList));
936 heap_endscan(scandesc);
937 heap_close(pg_index_rel);
939 /* We cannot trust to relhasindex of the main_relation now, so... */
943 *index_rels = (Relation *) palloc(*n_indices * sizeof(Relation));
945 for (i = 0, scan = head; i < *n_indices; i++, scan = scan->next)
946 (*index_rels)[i] = index_open(scan->index_rel_oid);
948 for (i = 0, scan = head; i < *n_indices + 1; i++)
956 #define EXT_ATTLEN 5*8192
959 returns 1 is c is in s
962 inString(char c, char *s)
981 * Reads input from fp until an end of line is seen.
985 CopyReadNewline(FILE *fp, int *newline)
989 elog(NOTICE, "CopyReadNewline: line %d - extra fields ignored", lineno);
990 while (!feof(fp) && (getc(fp) != '\n'));
998 * Reads input from fp until eof is seen. If we are reading from standard
999 * input, AND we see a dot on a line by itself (a dot followed immediately
1000 * by a newline), we exit as if we saw eof. This is so that copy pipelines
1001 * can be used as standard input.
1006 CopyReadAttribute(FILE *fp, bool *isnull, char *delim, int *newline)
1008 CopyReadAttribute(FILE *fp, bool *isnull, char *delim)
1011 static char attribute[EXT_ATTLEN];
1017 /* if last delimiter was a newline return a NULL attribute */
1020 *isnull = (bool) true;
1025 *isnull = (bool) false; /* set default */
1057 val = (val << 3) + VALUE(c);
1060 val = (val << 3) + VALUE(c);
1096 attribute[0] = '\0'; /* just to be safe */
1097 *isnull = (bool) true;
1102 elog(ERROR, "CopyReadAttribute - end of record marker corrupted. line: %d", lineno);
1107 else if (inString(c, delim) || c == '\n')
1117 if (i == EXT_ATTLEN - 1)
1118 elog(ERROR, "CopyReadAttribute - attribute length too long. line: %d", lineno);
1120 attribute[i] = '\0';
1121 return (&attribute[0]);
1125 CopyAttributeOut(FILE *fp, char *string, char *delim, int is_array)
1129 for (; (c = *string) != '\0'; string++)
1131 if (c == delim[0] || c == '\n' ||
1132 (c == '\\' && !is_array))
1134 else if (c == '\\' && is_array)
1136 if (*(string + 1) == '\\')
1138 /* translate \\ to \\\\ */
1144 else if (*(string + 1) == '"')
1146 /* translate \" to \\\" */
1156 * Returns the number of tuples in a relation. Unfortunately, currently
1157 * must do a scan of the entire relation to determine this.
1159 * relation is expected to be an open relation descriptor.
1162 CountTuples(Relation relation)
1164 HeapScanDesc scandesc;
1169 scandesc = heap_beginscan(relation, 0, false, 0, NULL);
1171 for (tuple = heap_getnext(scandesc, 0, NULL), i = 0;
1173 tuple = heap_getnext(scandesc, 0, NULL), i++)
1175 heap_endscan(scandesc);