* copy.c
* Implements the COPY utility command
*
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "nodes/makefuncs.h"
+#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
{
COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
- COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
+ COPY_CALLBACK /* to/from callback function */
} CopyDest;
/*
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_source_cb data_source_cb; /* function for reading data */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool freeze; /* freeze rows on loading? */
bool volatile_defexprs; /* is any of defexprs volatile? */
List *range_table;
+ PartitionDispatch *partition_dispatch_info;
+ int num_dispatch; /* Number of entries in the above array */
+ int num_partitions; /* Number of members in the following arrays */
+ ResultRelInfo *partitions; /* Per partition result relation */
+ TupleConversionMap **partition_tupconv_maps;
+ TupleTableSlot *partition_tuple_slot;
+
/*
* These variables are used to reduce overhead in textual COPY FROM.
*
/* non-export function prototypes */
-static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, Node *raw_query,
- const Oid queryRelId, List *attnamelist,
+static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
+ RawStmt *raw_query, Oid queryRelId, List *attnamelist,
List *options);
static void EndCopy(CopyState cstate);
static void ClosePipeToProgram(CopyState cstate);
-static CopyState BeginCopyTo(ParseState *pstate, Relation rel, Node *query,
- const Oid queryRelId, const char *filename, bool is_program,
+static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+ Oid queryRelId, const char *filename, bool is_program,
List *attnamelist, List *options);
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
CommandId mycid, int hi_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
pq_endmessage(&buf);
cstate->copy_dest = COPY_NEW_FE;
}
- else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+ else
{
/* old way */
if (cstate->binary)
pq_startcopyout();
cstate->copy_dest = COPY_OLD_FE;
}
- else
- {
- /* very old way */
- if (cstate->binary)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("COPY BINARY is not supported to stdout or from stdin")));
- pq_putemptymessage('B');
- /* grottiness needed for old COPY OUT protocol */
- pq_startcopyout();
- cstate->copy_dest = COPY_OLD_FE;
- }
}
static void
cstate->copy_dest = COPY_NEW_FE;
cstate->fe_msgbuf = makeStringInfo();
}
- else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
+ else
{
/* old way */
if (cstate->binary)
pq_startmsgread();
cstate->copy_dest = COPY_OLD_FE;
}
- else
- {
- /* very old way */
- if (cstate->binary)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("COPY BINARY is not supported to stdout or from stdin")));
- pq_putemptymessage('D');
- /* any error in old protocol will make us lose sync */
- pq_startmsgread();
- cstate->copy_dest = COPY_OLD_FE;
- }
/* We *must* flush here to ensure FE knows it can send. */
pq_flush();
}
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ Assert(false); /* Not yet supported. */
+ break;
}
resetStringInfo(fe_msgbuf);
bytesread += avail;
}
break;
+ case COPY_CALLBACK:
+ bytesread = cstate->data_source_cb(databuf, minread, maxread);
+ break;
}
return bytesread;
* Do not allow the copy if user doesn't have proper permission to access
* the table or the specifically requested columns.
*/
-Oid
-DoCopy(ParseState *pstate, const CopyStmt *stmt, uint64 *processed)
+void
+DoCopy(ParseState *pstate, const CopyStmt *stmt,
+ int stmt_location, int stmt_len,
+ uint64 *processed)
{
CopyState cstate;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
Relation rel;
Oid relid;
- Node *query = NULL;
- List *range_table = NIL;
+ RawStmt *query = NULL;
/* Disallow COPY to/from file or program except to superusers. */
if (!pipe && !superuser())
if (stmt->relation)
{
TupleDesc tupDesc;
- AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
List *attnums;
ListCell *cur;
RangeTblEntry *rte;
relid = RelationGetRelid(rel);
- rte = makeNode(RangeTblEntry);
- rte->rtekind = RTE_RELATION;
- rte->relid = RelationGetRelid(rel);
- rte->relkind = rel->rd_rel->relkind;
- rte->requiredPerms = required_access;
- range_table = list_make1(rte);
+ rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
+ rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
else
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
- ExecCheckRTPerms(range_table, true);
+ ExecCheckRTPerms(pstate->p_rtable, true);
/*
* Permission check for row security policies.
ColumnRef *cr;
ResTarget *target;
RangeVar *from;
+ List *targetList = NIL;
if (is_from)
ereport(ERROR,
errmsg("COPY FROM not supported with row-level security"),
errhint("Use INSERT statements instead.")));
- /* Build target list */
- cr = makeNode(ColumnRef);
-
+ /*
+ * Build target list
+ *
+ * If no columns are specified in the attribute list of the COPY
+ * command, then the target list is 'all' columns. Therefore, '*'
+ * should be used as the target list for the resulting SELECT
+ * statement.
+ *
+ * In the case that columns are specified in the attribute list,
+ * create a ColumnRef and ResTarget for each column and add them
+ * to the target list for the resulting SELECT statement.
+ */
if (!stmt->attlist)
+ {
+ cr = makeNode(ColumnRef);
cr->fields = list_make1(makeNode(A_Star));
- else
- cr->fields = stmt->attlist;
+ cr->location = -1;
- cr->location = 1;
+ target = makeNode(ResTarget);
+ target->name = NULL;
+ target->indirection = NIL;
+ target->val = (Node *) cr;
+ target->location = -1;
+
+ targetList = list_make1(target);
+ }
+ else
+ {
+ ListCell *lc;
- target = makeNode(ResTarget);
- target->name = NULL;
- target->indirection = NIL;
- target->val = (Node *) cr;
- target->location = 1;
+ foreach(lc, stmt->attlist)
+ {
+ /*
+ * Build the ColumnRef for each column. The ColumnRef
+ * 'fields' property is a String 'Value' node (see
+ * nodes/value.h) that corresponds to the column name
+ * respectively.
+ */
+ cr = makeNode(ColumnRef);
+ cr->fields = list_make1(lfirst(lc));
+ cr->location = -1;
+
+ /* Build the ResTarget and add the ColumnRef to it. */
+ target = makeNode(ResTarget);
+ target->name = NULL;
+ target->indirection = NIL;
+ target->val = (Node *) cr;
+ target->location = -1;
+
+ /* Add each column to the SELECT statement's target list */
+ targetList = lappend(targetList, target);
+ }
+ }
/*
* Build RangeVar for from clause, fully qualified based on the
* relation which we have opened and locked.
*/
from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), -1);
+ pstrdup(RelationGetRelationName(rel)),
+ -1);
/* Build query */
select = makeNode(SelectStmt);
- select->targetList = list_make1(target);
+ select->targetList = targetList;
select->fromClause = list_make1(from);
- query = (Node *) select;
+ query = makeNode(RawStmt);
+ query->stmt = (Node *) select;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
/*
* Close the relation for now, but keep the lock on it to prevent
{
Assert(stmt->query);
- query = stmt->query;
+ query = makeNode(RawStmt);
+ query->stmt = stmt->query;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
+
relid = InvalidOid;
rel = NULL;
}
PreventCommandIfParallelMode("COPY FROM");
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
- cstate->range_table = range_table;
+ NULL, stmt->attlist, stmt->options);
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
*/
if (rel != NULL)
heap_close(rel, (is_from ? NoLock : AccessShareLock));
-
- return relid;
}
/*
/* Extract options from the statement node tree */
foreach(option, options)
{
- DefElem *defel = (DefElem *) lfirst(option);
+ DefElem *defel = lfirst_node(DefElem, option);
if (strcmp(defel->defname, "format") == 0)
{
if (defel->arg && IsA(defel->arg, A_Star))
cstate->force_quote_all = true;
else if (defel->arg && IsA(defel->arg, List))
- cstate->force_quote = (List *) defel->arg;
+ cstate->force_quote = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
if (defel->arg && IsA(defel->arg, List))
- cstate->force_notnull = (List *) defel->arg;
+ cstate->force_notnull = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
if (defel->arg && IsA(defel->arg, List))
- cstate->force_null = (List *) defel->arg;
+ cstate->force_null = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
parser_errposition(pstate, defel->location)));
cstate->convert_selectively = true;
if (defel->arg == NULL || IsA(defel->arg, List))
- cstate->convert_select = (List *) defel->arg;
+ cstate->convert_select = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
BeginCopy(ParseState *pstate,
bool is_from,
Relation rel,
- Node *raw_query,
- const Oid queryRelId,
+ RawStmt *raw_query,
+ Oid queryRelId,
List *attnamelist,
List *options)
{
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
+
+ /* Initialize state for CopyFrom tuple routing. */
+ if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDispatch *partition_dispatch_info;
+ ResultRelInfo *partitions;
+ TupleConversionMap **partition_tupconv_maps;
+ TupleTableSlot *partition_tuple_slot;
+ int num_parted,
+ num_partitions;
+
+ ExecSetupPartitionTupleRouting(rel,
+ &partition_dispatch_info,
+ &partitions,
+ &partition_tupconv_maps,
+ &partition_tuple_slot,
+ &num_parted, &num_partitions);
+ cstate->partition_dispatch_info = partition_dispatch_info;
+ cstate->num_dispatch = num_parted;
+ cstate->partitions = partitions;
+ cstate->num_partitions = num_partitions;
+ cstate->partition_tupconv_maps = partition_tupconv_maps;
+ cstate->partition_tuple_slot = partition_tuple_slot;
+ }
}
else
{
* function and is executed repeatedly. (See also the same hack in
* DECLARE CURSOR and PREPARE.) XXX FIXME someday.
*/
- rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
- pstate->p_sourcetext, NULL, 0);
+ rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+ pstate->p_sourcetext, NULL, 0,
+ NULL);
/* check that we got back something we can work with */
if (rewritten == NIL)
/* examine queries to determine which error message to issue */
foreach(lc, rewritten)
{
- Query *q = (Query *) lfirst(lc);
+ Query *q = lfirst_node(Query, lc);
if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
ereport(ERROR,
errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
}
- query = (Query *) linitial(rewritten);
+ query = linitial_node(Query, rewritten);
/* The grammar allows SELECT INTO, but we don't support that */
if (query->utilityStmt != NULL &&
cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
GetActiveSnapshot(),
InvalidSnapshot,
- dest, NULL, 0);
+ dest, NULL, NULL, 0);
/*
* Call ExecutorStart to prepare the plan for execution.
static CopyState
BeginCopyTo(ParseState *pstate,
Relation rel,
- Node *query,
- const Oid queryRelId,
+ RawStmt *query,
+ Oid queryRelId,
const char *filename,
bool is_program,
List *attnamelist,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from sequence \"%s\"",
RelationGetRelationName(rel))));
+ else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from partitioned table \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
else
{
/* run the plan --- the dest receiver will send tuples */
- ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
/*
* Copy FROM file to relation.
*/
-static uint64
+uint64
CopyFrom(CopyState cstate)
{
HeapTuple tuple;
Datum *values;
bool *nulls;
ResultRelInfo *resultRelInfo;
+ ResultRelInfo *saved_resultRelInfo = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ExprContext *econtext;
TupleTableSlot *myslot;
uint64 processed = 0;
bool useHeapMultiInsert;
int nBufferedTuples = 0;
+ int prev_leaf_part_index = -1;
#define MAX_BUFFERED_TUPLES 1000
HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
Assert(cstate->rel);
- if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
+ /*
+ * The target must be a plain relation or have an INSTEAD OF INSERT row
+ * trigger. (Currently, such triggers are only allowed on views, so we
+ * only hint about them in the view case.)
+ */
+ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+ cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
+ !(cstate->rel->trigdesc &&
+ cstate->rel->trigdesc->trig_insert_instead_row))
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
- RelationGetRelationName(cstate->rel))));
+ RelationGetRelationName(cstate->rel)),
+ errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
* earlier scan or command. This ensures that if this subtransaction
* aborts then the frozen rows won't be visible after xact cleanup. Note
* that the stronger test of exactly which subtransaction created it is
- * crucial for correctness of this optimisation.
+ * crucial for correctness of this optimization.
*/
if (cstate->freeze)
{
InitResultRelInfo(resultRelInfo,
cstate->rel,
1, /* dummy rangetable index */
+ NULL,
0);
ExecOpenIndices(resultRelInfo, false);
* BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
* expressions. Such triggers or expressions might query the table we're
* inserting to, and act differently if the tuples that have already been
- * processed and prepared for insertion are not there.
+ * processed and prepared for insertion are not there. We also can't do
+ * it if the table is partitioned.
*/
if ((resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+ cstate->partition_dispatch_info != NULL ||
cstate->volatile_defexprs)
{
useHeapMultiInsert = false;
slot = myslot;
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+ /* Determine the partition to heap_insert the tuple into */
+ if (cstate->partition_dispatch_info)
+ {
+ int leaf_part_index;
+ TupleConversionMap *map;
+
+ /*
+ * Away we go ... If we end up not finding a partition after all,
+ * ExecFindPartition() does not return and errors out instead.
+ * Otherwise, the returned value is to be used as an index into
+ * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+ * will get us the ResultRelInfo and TupleConversionMap for the
+ * partition, respectively.
+ */
+ leaf_part_index = ExecFindPartition(resultRelInfo,
+ cstate->partition_dispatch_info,
+ slot,
+ estate);
+ Assert(leaf_part_index >= 0 &&
+ leaf_part_index < cstate->num_partitions);
+
+ /*
+ * If this tuple is mapped to a partition that is not same as the
+ * previous one, we'd better make the bulk insert mechanism gets a
+ * new buffer.
+ */
+ if (prev_leaf_part_index != leaf_part_index)
+ {
+ ReleaseBulkInsertStatePin(bistate);
+ prev_leaf_part_index = leaf_part_index;
+ }
+
+ /*
+ * Save the old ResultRelInfo and switch to the one corresponding
+ * to the selected partition.
+ */
+ saved_resultRelInfo = resultRelInfo;
+ resultRelInfo = cstate->partitions + leaf_part_index;
+
+ /* We do not yet have a way to insert into a foreign partition */
+ if (resultRelInfo->ri_FdwRoutine)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot route inserted tuples to a foreign table")));
+
+ /*
+ * For ExecInsertIndexTuples() to work on the partition's indexes
+ */
+ estate->es_result_relation_info = resultRelInfo;
+
+ /*
+ * We might need to convert from the parent rowtype to the
+ * partition rowtype.
+ */
+ map = cstate->partition_tupconv_maps[leaf_part_index];
+ if (map)
+ {
+ Relation partrel = resultRelInfo->ri_RelationDesc;
+
+ tuple = do_convert_tuple(tuple, map);
+
+ /*
+ * We must use the partition's tuple descriptor from this
+ * point on. Use a dedicated slot from this point on until
+ * we're finished dealing with the partition.
+ */
+ slot = cstate->partition_tuple_slot;
+ Assert(slot != NULL);
+ ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+ ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+ }
+
+ tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ }
+
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (!skip_tuple)
{
- /* Check the constraints of the tuple */
- if (cstate->rel->rd_att->constr)
- ExecConstraints(resultRelInfo, slot, estate);
-
- if (useHeapMultiInsert)
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
{
- /* Add this tuple to the tuple buffer */
- if (nBufferedTuples == 0)
- firstBufferedLineNo = cstate->cur_lineno;
- bufferedTuples[nBufferedTuples++] = tuple;
- bufferedTuplesSize += tuple->t_len;
-
- /*
- * If the buffer filled up, flush it. Also flush if the total
- * size of all the tuples in the buffer becomes large, to
- * avoid using large amounts of memory for the buffers when
- * the tuples are exceptionally wide.
- */
- if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
- bufferedTuplesSize > 65535)
- {
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
- resultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
- nBufferedTuples = 0;
- bufferedTuplesSize = 0;
- }
+ /* Pass the data to the INSTEAD ROW INSERT trigger */
+ ExecIRInsertTriggers(estate, resultRelInfo, slot);
}
else
{
- List *recheckIndexes = NIL;
+ /* Check the constraints of the tuple */
+ if (cstate->rel->rd_att->constr ||
+ resultRelInfo->ri_PartitionCheck)
+ ExecConstraints(resultRelInfo, slot, estate);
+
+ if (useHeapMultiInsert)
+ {
+ /* Add this tuple to the tuple buffer */
+ if (nBufferedTuples == 0)
+ firstBufferedLineNo = cstate->cur_lineno;
+ bufferedTuples[nBufferedTuples++] = tuple;
+ bufferedTuplesSize += tuple->t_len;
- /* OK, store the tuple and create index entries for it */
- heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+ /*
+ * If the buffer filled up, flush it. Also flush if the
+ * total size of all the tuples in the buffer becomes
+ * large, to avoid using large amounts of memory for the
+ * buffer when the tuples are exceptionally wide.
+ */
+ if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+ bufferedTuplesSize > 65535)
+ {
+ CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ resultRelInfo, myslot, bistate,
+ nBufferedTuples, bufferedTuples,
+ firstBufferedLineNo);
+ nBufferedTuples = 0;
+ bufferedTuplesSize = 0;
+ }
+ }
+ else
+ {
+ List *recheckIndexes = NIL;
+
+ /* OK, store the tuple and create index entries for it */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+ hi_options, bistate);
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate, false, NULL,
- NIL);
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(slot,
+ &(tuple->t_self),
+ estate,
+ false,
+ NULL,
+ NIL);
- /* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, tuple,
- recheckIndexes);
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, tuple,
+ recheckIndexes);
- list_free(recheckIndexes);
+ list_free(recheckIndexes);
+ }
}
/*
* tuples inserted by an INSERT command.
*/
processed++;
+
+ if (saved_resultRelInfo)
+ {
+ resultRelInfo = saved_resultRelInfo;
+ estate->es_result_relation_info = resultRelInfo;
+ }
}
}
ExecCloseIndices(resultRelInfo);
+ /* Close all the partitioned tables, leaf partitions, and their indices */
+ if (cstate->partition_dispatch_info)
+ {
+ int i;
+
+ /*
+ * Remember cstate->partition_dispatch_info[0] corresponds to the root
+ * partitioned table, which we must not try to close, because it is
+ * the main target table of COPY that will be closed eventually by
+ * DoCopy(). Also, tupslot is NULL for the root partitioned table.
+ */
+ for (i = 1; i < cstate->num_dispatch; i++)
+ {
+ PartitionDispatch pd = cstate->partition_dispatch_info[i];
+
+ heap_close(pd->reldesc, NoLock);
+ ExecDropSingleTupleTableSlot(pd->tupslot);
+ }
+ for (i = 0; i < cstate->num_partitions; i++)
+ {
+ ResultRelInfo *resultRelInfo = cstate->partitions + i;
+
+ ExecCloseIndices(resultRelInfo);
+ heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+ }
+
+ /* Release the standalone partition tuple descriptor */
+ ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
+ }
+
+ /* Close any trigger target relations */
+ ExecCleanUpTriggerState(estate);
+
FreeExecutorState(estate);
/*
Relation rel,
const char *filename,
bool is_program,
+ copy_data_source_cb data_source_cb,
List *attnamelist,
List *options)
{
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
+ /* Assign range table, we'll need it in CopyFrom. */
+ if (pstate)
+ cstate->range_table = pstate->p_rtable;
+
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
* the special case of when the default expression is the
* nextval() of a sequence which in this specific case is
* known to be safe for use with the multi-insert
- * optimisation. Hence we use this special case function
+ * optimization. Hence we use this special case function
* checker rather than the standard check for
* contain_volatile_functions().
*/
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
- if (pipe)
+ if (data_source_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_source_cb = data_source_cb;
+ }
+ else if (pipe)
{
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
- &nulls[defmap[i]], NULL);
+ &nulls[defmap[i]]);
}
return true;