* copy.c
* Implements the COPY utility command
*
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "nodes/makefuncs.h"
+#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
{
COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
- COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
+ COPY_CALLBACK /* to/from callback function */
} CopyDest;
/*
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_source_cb data_source_cb; /* function for reading data */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool freeze; /* freeze rows on loading? */
ExprState **defexprs; /* array of default att expressions */
bool volatile_defexprs; /* is any of defexprs volatile? */
List *range_table;
+
PartitionDispatch *partition_dispatch_info;
- int num_dispatch;
- int num_partitions;
- ResultRelInfo *partitions;
+ int num_dispatch; /* Number of entries in the above array */
+ int num_partitions; /* Number of members in the following arrays */
+ ResultRelInfo *partitions; /* Per partition result relation */
TupleConversionMap **partition_tupconv_maps;
+ TupleTableSlot *partition_tuple_slot;
/*
* These variables are used to reduce overhead in textual COPY FROM.
/* non-export function prototypes */
-static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, Node *raw_query,
- const Oid queryRelId, List *attnamelist,
+static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
+ RawStmt *raw_query, Oid queryRelId, List *attnamelist,
List *options);
static void EndCopy(CopyState cstate);
static void ClosePipeToProgram(CopyState cstate);
-static CopyState BeginCopyTo(ParseState *pstate, Relation rel, Node *query,
- const Oid queryRelId, const char *filename, bool is_program,
+static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+ Oid queryRelId, const char *filename, bool is_program,
List *attnamelist, List *options);
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
CommandId mycid, int hi_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
pq_sendint(&buf, format, 2); /* per-column formats */
pq_endmessage(&buf);
cstate->copy_dest = COPY_NEW_FE;
- cstate->fe_msgbuf = makeLongStringInfo();
+ cstate->fe_msgbuf = makeStringInfo();
}
else
{
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ Assert(false); /* Not yet supported. */
+ break;
}
resetStringInfo(fe_msgbuf);
bytesread += avail;
}
break;
+ case COPY_CALLBACK:
+ bytesread = cstate->data_source_cb(databuf, minread, maxread);
+ break;
}
return bytesread;
* Do not allow the copy if user doesn't have proper permission to access
* the table or the specifically requested columns.
*/
-Oid
-DoCopy(ParseState *pstate, const CopyStmt *stmt, uint64 *processed)
+void
+DoCopy(ParseState *pstate, const CopyStmt *stmt,
+ int stmt_location, int stmt_len,
+ uint64 *processed)
{
CopyState cstate;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
Relation rel;
Oid relid;
- Node *query = NULL;
- List *range_table = NIL;
+ RawStmt *query = NULL;
/* Disallow COPY to/from file or program except to superusers. */
if (!pipe && !superuser())
if (stmt->relation)
{
TupleDesc tupDesc;
- AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
List *attnums;
ListCell *cur;
RangeTblEntry *rte;
relid = RelationGetRelid(rel);
- rte = makeNode(RangeTblEntry);
- rte->rtekind = RTE_RELATION;
- rte->relid = RelationGetRelid(rel);
- rte->relkind = rel->rd_rel->relkind;
- rte->requiredPerms = required_access;
- range_table = list_make1(rte);
+ rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
+ rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
else
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
- ExecCheckRTPerms(range_table, true);
+ ExecCheckRTPerms(pstate->p_rtable, true);
/*
* Permission check for row security policies.
* relation which we have opened and locked.
*/
from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), -1);
+ pstrdup(RelationGetRelationName(rel)),
+ -1);
/* Build query */
select = makeNode(SelectStmt);
select->targetList = targetList;
select->fromClause = list_make1(from);
- query = (Node *) select;
+ query = makeNode(RawStmt);
+ query->stmt = (Node *) select;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
/*
* Close the relation for now, but keep the lock on it to prevent
{
Assert(stmt->query);
- query = stmt->query;
+ query = makeNode(RawStmt);
+ query->stmt = stmt->query;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
+
relid = InvalidOid;
rel = NULL;
}
PreventCommandIfParallelMode("COPY FROM");
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
- cstate->range_table = range_table;
+ NULL, stmt->attlist, stmt->options);
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
*/
if (rel != NULL)
heap_close(rel, (is_from ? NoLock : AccessShareLock));
-
- return relid;
}
/*
/* Extract options from the statement node tree */
foreach(option, options)
{
- DefElem *defel = (DefElem *) lfirst(option);
+ DefElem *defel = lfirst_node(DefElem, option);
if (strcmp(defel->defname, "format") == 0)
{
if (defel->arg && IsA(defel->arg, A_Star))
cstate->force_quote_all = true;
else if (defel->arg && IsA(defel->arg, List))
- cstate->force_quote = (List *) defel->arg;
+ cstate->force_quote = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
if (defel->arg && IsA(defel->arg, List))
- cstate->force_notnull = (List *) defel->arg;
+ cstate->force_notnull = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
if (defel->arg && IsA(defel->arg, List))
- cstate->force_null = (List *) defel->arg;
+ cstate->force_null = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
parser_errposition(pstate, defel->location)));
cstate->convert_selectively = true;
if (defel->arg == NULL || IsA(defel->arg, List))
- cstate->convert_select = (List *) defel->arg;
+ cstate->convert_select = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
BeginCopy(ParseState *pstate,
bool is_from,
Relation rel,
- Node *raw_query,
- const Oid queryRelId,
+ RawStmt *raw_query,
+ Oid queryRelId,
List *attnamelist,
List *options)
{
/* Initialize state for CopyFrom tuple routing. */
if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
- List *leaf_parts;
- ListCell *cell;
- int i,
- num_parted;
- ResultRelInfo *leaf_part_rri;
-
- /* Get the tuple-routing information and lock partitions */
- cstate->partition_dispatch_info =
- RelationGetPartitionDispatchInfo(rel, RowExclusiveLock,
- &num_parted,
- &leaf_parts);
+ PartitionDispatch *partition_dispatch_info;
+ ResultRelInfo *partitions;
+ TupleConversionMap **partition_tupconv_maps;
+ TupleTableSlot *partition_tuple_slot;
+ int num_parted,
+ num_partitions;
+
+ ExecSetupPartitionTupleRouting(rel,
+ &partition_dispatch_info,
+ &partitions,
+ &partition_tupconv_maps,
+ &partition_tuple_slot,
+ &num_parted, &num_partitions);
+ cstate->partition_dispatch_info = partition_dispatch_info;
cstate->num_dispatch = num_parted;
- cstate->num_partitions = list_length(leaf_parts);
- cstate->partitions = (ResultRelInfo *)
- palloc(cstate->num_partitions *
- sizeof(ResultRelInfo));
- cstate->partition_tupconv_maps = (TupleConversionMap **)
- palloc0(cstate->num_partitions *
- sizeof(TupleConversionMap *));
-
- leaf_part_rri = cstate->partitions;
- i = 0;
- foreach(cell, leaf_parts)
- {
- Relation partrel;
-
- /*
- * We locked all the partitions above including the leaf
- * partitions. Note that each of the relations in
- * cstate->partitions will be closed by CopyFrom() after it's
- * finished with its processing.
- */
- partrel = heap_open(lfirst_oid(cell), NoLock);
-
- /*
- * Verify result relation is a valid target for the current
- * operation.
- */
- CheckValidResultRel(partrel, CMD_INSERT);
-
- InitResultRelInfo(leaf_part_rri,
- partrel,
- 1, /* dummy */
- false, /* no partition constraint
- * check */
- 0);
-
- /* Open partition indices */
- ExecOpenIndices(leaf_part_rri, false);
-
- if (!equalTupleDescs(tupDesc, RelationGetDescr(partrel)))
- cstate->partition_tupconv_maps[i] =
- convert_tuples_by_name(tupDesc,
- RelationGetDescr(partrel),
- gettext_noop("could not convert row type"));
- leaf_part_rri++;
- i++;
- }
+ cstate->partitions = partitions;
+ cstate->num_partitions = num_partitions;
+ cstate->partition_tupconv_maps = partition_tupconv_maps;
+ cstate->partition_tuple_slot = partition_tuple_slot;
}
}
else
* function and is executed repeatedly. (See also the same hack in
* DECLARE CURSOR and PREPARE.) XXX FIXME someday.
*/
- rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
- pstate->p_sourcetext, NULL, 0);
+ rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+ pstate->p_sourcetext, NULL, 0,
+ NULL);
/* check that we got back something we can work with */
if (rewritten == NIL)
/* examine queries to determine which error message to issue */
foreach(lc, rewritten)
{
- Query *q = (Query *) lfirst(lc);
+ Query *q = lfirst_node(Query, lc);
if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
ereport(ERROR,
errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
}
- query = (Query *) linitial(rewritten);
+ query = linitial_node(Query, rewritten);
/* The grammar allows SELECT INTO, but we don't support that */
if (query->utilityStmt != NULL &&
cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
GetActiveSnapshot(),
InvalidSnapshot,
- dest, NULL, 0);
+ dest, NULL, NULL, 0);
/*
* Call ExecutorStart to prepare the plan for execution.
static CopyState
BeginCopyTo(ParseState *pstate,
Relation rel,
- Node *query,
- const Oid queryRelId,
+ RawStmt *query,
+ Oid queryRelId,
const char *filename,
bool is_program,
List *attnamelist,
cstate->null_print_client = cstate->null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
- cstate->fe_msgbuf = makeLongStringInfo();
+ cstate->fe_msgbuf = makeStringInfo();
/* Get info about the columns we need to process. */
cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
else
{
/* run the plan --- the dest receiver will send tuples */
- ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
/*
* Copy FROM file to relation.
*/
-static uint64
+uint64
CopyFrom(CopyState cstate)
{
HeapTuple tuple;
uint64 processed = 0;
bool useHeapMultiInsert;
int nBufferedTuples = 0;
+ int prev_leaf_part_index = -1;
#define MAX_BUFFERED_TUPLES 1000
HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
* earlier scan or command. This ensures that if this subtransaction
* aborts then the frozen rows won't be visible after xact cleanup. Note
* that the stronger test of exactly which subtransaction created it is
- * crucial for correctness of this optimisation.
+ * crucial for correctness of this optimization.
*/
if (cstate->freeze)
{
InitResultRelInfo(resultRelInfo,
cstate->rel,
1, /* dummy rangetable index */
- true, /* do load partition check expression */
+ NULL,
0);
ExecOpenIndices(resultRelInfo, false);
Assert(leaf_part_index >= 0 &&
leaf_part_index < cstate->num_partitions);
+ /*
+ * If this tuple is mapped to a partition that is not same as the
+ * previous one, we'd better make the bulk insert mechanism gets a
+ * new buffer.
+ */
+ if (prev_leaf_part_index != leaf_part_index)
+ {
+ ReleaseBulkInsertStatePin(bistate);
+ prev_leaf_part_index = leaf_part_index;
+ }
+
/*
* Save the old ResultRelInfo and switch to the one corresponding
* to the selected partition.
map = cstate->partition_tupconv_maps[leaf_part_index];
if (map)
{
+ Relation partrel = resultRelInfo->ri_RelationDesc;
+
tuple = do_convert_tuple(tuple, map);
+
+ /*
+ * We must use the partition's tuple descriptor from this
+ * point on. Use a dedicated slot from this point on until
+ * we're finished dealing with the partition.
+ */
+ slot = cstate->partition_tuple_slot;
+ Assert(slot != NULL);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
ExecStoreTuple(tuple, slot, InvalidBuffer, true);
}
* Remember cstate->partition_dispatch_info[0] corresponds to the root
* partitioned table, which we must not try to close, because it is
* the main target table of COPY that will be closed eventually by
- * DoCopy().
+ * DoCopy(). Also, tupslot is NULL for the root partitioned table.
*/
for (i = 1; i < cstate->num_dispatch; i++)
{
PartitionDispatch pd = cstate->partition_dispatch_info[i];
heap_close(pd->reldesc, NoLock);
+ ExecDropSingleTupleTableSlot(pd->tupslot);
}
for (i = 0; i < cstate->num_partitions; i++)
{
ExecCloseIndices(resultRelInfo);
heap_close(resultRelInfo->ri_RelationDesc, NoLock);
}
+
+ /* Release the standalone partition tuple descriptor */
+ ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
}
+ /* Close any trigger target relations */
+ ExecCleanUpTriggerState(estate);
+
FreeExecutorState(estate);
/*
Relation rel,
const char *filename,
bool is_program,
+ copy_data_source_cb data_source_cb,
List *attnamelist,
List *options)
{
cstate->cur_attval = NULL;
/* Set up variables to avoid per-attribute overhead. */
- initLongStringInfo(&cstate->attribute_buf);
- initLongStringInfo(&cstate->line_buf);
+ initStringInfo(&cstate->attribute_buf);
+ initStringInfo(&cstate->line_buf);
cstate->line_buf_converted = false;
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
+ /* Assign range table, we'll need it in CopyFrom. */
+ if (pstate)
+ cstate->range_table = pstate->p_rtable;
+
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
* the special case of when the default expression is the
* nextval() of a sequence which in this specific case is
* known to be safe for use with the multi-insert
- * optimisation. Hence we use this special case function
+ * optimization. Hence we use this special case function
* checker rather than the standard check for
* contain_volatile_functions().
*/
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
- if (pipe)
+ if (data_source_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_source_cb = data_source_cb;
+ }
+ else if (pipe)
{
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
- &nulls[defmap[i]], NULL);
+ &nulls[defmap[i]]);
}
return true;