*/
typedef enum CopyInsertMethod
{
- CIM_SINGLE, /* use heap_insert or fdw routine */
- CIM_MULTI, /* always use heap_multi_insert */
- CIM_MULTI_CONDITIONAL /* use heap_multi_insert only if valid */
+ CIM_SINGLE, /* use table_insert or fdw routine */
+ CIM_MULTI, /* always use table_multi_insert */
+ CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
} CopyInsertMethod;
/*
} DR_copy;
+/*
+ * No more than this many tuples per CopyMultiInsertBuffer
+ *
+ * Caution: Don't make this too big, as we could end up with this many
+ * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
+ * multiInsertBuffers list. Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size, of tuples stored.
+ */
+#define MAX_BUFFERED_BYTES 65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS 32
+
+/* Stores multi-insert data related to a single relation in CopyFrom. */
+typedef struct CopyMultiInsertBuffer
+{
+ TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
+ BulkInsertState bistate; /* BulkInsertState for this rel */
+ int nused; /* number of 'slots' containing tuples */
+ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ * stream */
+} CopyMultiInsertBuffer;
+
+/*
+ * Stores one or many CopyMultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them. This allows multiple buffers to
+ * exist at once when COPYing into a partitioned table.
+ */
+typedef struct CopyMultiInsertInfo
+{
+ List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
+ int bufferedTuples; /* number of tuples buffered over all buffers */
+ int bufferedBytes; /* number of bytes from all buffered tuples */
+ int nbuffers; /* number of buffers we're tracking */
+ CopyState cstate; /* Copy state for this CopyMultiInsertInfo */
+ EState *estate; /* Executor state used for COPY */
+ CommandId mycid; /* Command Id used for COPY */
+ int ti_options; /* table insert options */
+} CopyMultiInsertInfo;
+
+
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
* They are macros because they often do continue/break control and to avoid
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
- Datum *values, bool *nulls);
-static void CopyFromInsertBatch(CopyState cstate, EState *estate,
- CommandId mycid, int ti_options,
- ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
- BulkInsertState bistate,
- int nBufferedTuples, HeapTuple *bufferedTuples,
- uint64 firstBufferedLineNo);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
Relation rel;
Oid relid;
RawStmt *query = NULL;
- Node *whereClause = NULL;
+ Node *whereClause = NULL;
/*
* Disallow COPY to/from file or program except to users with the
/* Transform the raw expression tree */
whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
- /* Make sure it yields a boolean result. */
+ /* Make sure it yields a boolean result. */
whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
/* we have to fix its collations too */
if (cstate->rel)
{
- Datum *values;
- bool *nulls;
+ TupleTableSlot *slot;
TableScanDesc scandesc;
- HeapTuple tuple;
-
- values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
- nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+ slot = table_slot_create(cstate->rel, NULL);
processed = 0;
- while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+ while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
{
CHECK_FOR_INTERRUPTS();
- /* Deconstruct the tuple ... faster than repeated heap_getattr */
- heap_deform_tuple(tuple, tupDesc, values, nulls);
+ /* Deconstruct the tuple ... */
+ slot_getallattrs(slot);
/* Format and send the data */
- CopyOneRowTo(cstate, values, nulls);
+ CopyOneRowTo(cstate, slot);
processed++;
}
+ ExecDropSingleTupleTableSlot(slot);
table_endscan(scandesc);
-
- pfree(values);
- pfree(nulls);
}
else
{
* Emit one row during CopyTo().
*/
static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
{
bool need_delim = false;
FmgrInfo *out_functions = cstate->out_functions;
CopySendInt16(cstate, list_length(cstate->attnumlist));
}
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
- Datum value = values[attnum - 1];
- bool isnull = nulls[attnum - 1];
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
if (!cstate->binary)
{
return res;
}
+/*
+ * Allocate memory and initialize a new CopyMultiInsertBuffer for this
+ * ResultRelInfo.
+ */
+static CopyMultiInsertBuffer *
+CopyMultiInsertBufferInit(ResultRelInfo *rri)
+{
+ CopyMultiInsertBuffer *buffer;
+
+ buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ buffer->resultRelInfo = rri;
+ buffer->bistate = GetBulkInsertState();
+ buffer->nused = 0;
+
+ return buffer;
+}
+
+/*
+ * Make a new buffer for this ResultRelInfo.
+ */
+static inline void
+CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
+ ResultRelInfo *rri)
+{
+ CopyMultiInsertBuffer *buffer;
+
+ buffer = CopyMultiInsertBufferInit(rri);
+
+ /* Setup back-link so we can easily find this buffer again */
+ rri->ri_CopyMultiInsertBuffer = buffer;
+ /* Record that we're tracking this buffer */
+ miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * Initialize an already allocated CopyMultiInsertInfo.
+ *
+ * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
+ * for that table.
+ */
+static void
+CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+ CopyState cstate, EState *estate, CommandId mycid,
+ int ti_options)
+{
+ miinfo->multiInsertBuffers = NIL;
+ miinfo->bufferedTuples = 0;
+ miinfo->bufferedBytes = 0;
+ miinfo->nbuffers = 0;
+ miinfo->cstate = cstate;
+ miinfo->estate = estate;
+ miinfo->mycid = mycid;
+ miinfo->ti_options = ti_options;
+
+ /*
+ * Only setup the buffer when not dealing with a partitioned table.
+ * Buffers for partitioned tables will just be setup when we need to send
+ * tuples their way for the first time.
+ */
+ if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+}
+
+/*
+ * Returns true if the buffers are full
+ */
+static inline bool
+CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
+{
+ if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+ miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ return true;
+ return false;
+}
+
+/*
+ * Returns true if we have no buffered tuples
+ */
+static inline bool
+CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
+{
+ return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
+ CopyMultiInsertBuffer *buffer)
+{
+ MemoryContext oldcontext;
+ int i;
+ uint64 save_cur_lineno;
+ CopyState cstate = miinfo->cstate;
+ EState *estate = miinfo->estate;
+ CommandId mycid = miinfo->mycid;
+ int ti_options = miinfo->ti_options;
+ bool line_buf_valid = cstate->line_buf_valid;
+ int nused = buffer->nused;
+ ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+ TupleTableSlot **slots = buffer->slots;
+
+ /*
+ * Print error context information correctly, if one of the operations
+ * below fail.
+ */
+ cstate->line_buf_valid = false;
+ save_cur_lineno = cstate->cur_lineno;
+
+ /*
+ * table_multi_insert may leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ table_multi_insert(resultRelInfo->ri_RelationDesc,
+ slots,
+ nused,
+ mycid,
+ ti_options,
+ buffer->bistate);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < nused; i++)
+ {
+ /*
+ * If there are any indexes, update them for all the inserted tuples,
+ * and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[i];
+ recheckIndexes =
+ ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+ NIL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], recheckIndexes,
+ cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT
+ * triggers anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[i];
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], NIL, cstate->transition_capture);
+ }
+
+ ExecClearTuple(slots[i]);
+ }
+
+ /* Mark that all slots are free */
+ buffer->nused = 0;
+
+ /* reset cur_lineno and line_buf_valid to what they were */
+ cstate->line_buf_valid = line_buf_valid;
+ cstate->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * Drop used slots and free member for this buffer.
+ *
+ * The buffer must be flushed before cleanup.
+ */
+static inline void
+CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer)
+{
+ int i;
+
+ /* Ensure buffer was flushed */
+ Assert(buffer->nused == 0);
+
+ /* Remove back-link to ourself */
+ buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+
+ FreeBulkInsertState(buffer->bistate);
+
+ /* Since we only create slots on demand, just drop the non-null ones. */
+ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+ pfree(buffer);
+}
+
+/*
+ * Write out all stored tuples in all buffers out to the tables.
+ *
+ * Once flushed we also trim the tracked buffers list down to size by removing
+ * the buffers created earliest first.
+ *
+ * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
+ * used. When cleaning up old buffers we'll never remove the one for
+ * 'curr_rri'.
+ */
+static inline void
+CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+{
+ ListCell *lc;
+
+ foreach(lc, miinfo->multiInsertBuffers)
+ {
+ CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+
+ CopyMultiInsertBufferFlush(miinfo, buffer);
+ }
+
+ miinfo->bufferedTuples = 0;
+ miinfo->bufferedBytes = 0;
+
+ /*
+ * Trim the list of tracked buffers down if it exceeds the limit. Here we
+ * remove buffers starting with the ones we created first. It seems more
+ * likely that these older ones are less likely to be needed than ones
+ * that were just created.
+ */
+ while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+ {
+ CopyMultiInsertBuffer *buffer;
+
+ buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+
+ /*
+ * We never want to remove the buffer that's currently being used, so
+ * if we happen to find that then move it to the end of the list.
+ */
+ if (buffer->resultRelInfo == curr_rri)
+ {
+ miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+ miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+ buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+ }
+
+ CopyMultiInsertBufferCleanup(buffer);
+ miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+ }
+}
+
+/*
+ * Cleanup allocated buffers and free memory
+ */
+static inline void
+CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
+{
+ ListCell *lc;
+
+ foreach(lc, miinfo->multiInsertBuffers)
+ CopyMultiInsertBufferCleanup(lfirst(lc));
+
+ list_free(miinfo->multiInsertBuffers);
+}
+
+/*
+ * Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ */
+static inline TupleTableSlot *
+CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
+ ResultRelInfo *rri)
+{
+ CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+ int nused = buffer->nused;
+
+ Assert(buffer != NULL);
+ Assert(nused < MAX_BUFFERED_TUPLES);
+
+ if (buffer->slots[nused] == NULL)
+ buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+ return buffer->slots[nused];
+}
+
+/*
+ * Record the previously reserved TupleTableSlot that was reserved by
+ * MultiInsertInfoNextFreeSlot as being consumed.
+ */
+static inline void
+CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+ TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+ CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+
+ Assert(buffer != NULL);
+ Assert(slot == buffer->slots[buffer->nused]);
+
+ /* Store the line number so we can properly report any errors later */
+ buffer->linenos[buffer->nused] = lineno;
+
+ /* Record this slot as being used */
+ buffer->nused++;
+
+ /* Update how many tuples are stored and their size */
+ miinfo->bufferedTuples++;
+ miinfo->bufferedBytes += tuplen;
+}
+
/*
* Copy FROM file to relation.
*/
uint64
CopyFrom(CopyState cstate)
{
- HeapTuple tuple;
- TupleDesc tupDesc;
- Datum *values;
- bool *nulls;
ResultRelInfo *resultRelInfo;
ResultRelInfo *target_resultRelInfo;
ResultRelInfo *prevResultRelInfo = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ModifyTableState *mtstate;
ExprContext *econtext;
- TupleTableSlot *myslot;
+ TupleTableSlot *singleslot = NULL;
MemoryContext oldcontext = CurrentMemoryContext;
- MemoryContext batchcontext;
PartitionTupleRouting *proute = NULL;
ErrorContextCallback errcallback;
CommandId mycid = GetCurrentCommandId(true);
int ti_options = 0; /* start with default table_insert options */
- BulkInsertState bistate;
+ BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod;
+ CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
uint64 processed = 0;
- int nBufferedTuples = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
-#define MAX_BUFFERED_TUPLES 1000
-#define RECHECK_MULTI_INSERT_THRESHOLD 1000
- HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
- Size bufferedTuplesSize = 0;
- uint64 firstBufferedLineNo = 0;
- uint64 lastPartitionSampleLineNo = 0;
- uint64 nPartitionChanges = 0;
- double avgTuplesPerPartChange = 0;
-
Assert(cstate->rel);
/*
RelationGetRelationName(cstate->rel))));
}
- tupDesc = RelationGetDescr(cstate->rel);
-
/*----------
* Check to see if we can avoid writing WAL
*
* FSM for free space is a waste of time, even if we must use WAL because
* of archiving. This could possibly be wrong, but it's unlikely.
*
- * The comments for heap_insert and RelationGetBufferForTuple specify that
+ * The comments for table_insert and RelationGetBufferForTuple specify that
* skipping WAL logging is only safe if we ensure that our tuples do not
* go into pages containing tuples from any other transactions --- but this
* must be the case if we have a new table or new relfilenode, so we need
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot perform FREEZE on a partitioned table")));
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot perform FREEZE on a partitioned table")));
}
/*
ExecInitRangeTable(estate, cstate->range_table);
- /* Set up a tuple slot too */
- myslot = ExecInitExtraTupleSlot(estate, tupDesc,
- &TTSOpsHeapTuple);
-
/*
* Set up a ModifyTableState so we can let FDW(s) init themselves for
* foreign-table result relation(s).
if (cstate->whereClause)
cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
- &mtstate->ps);
+ &mtstate->ps);
/*
- * It's more efficient to prepare a bunch of tuples for insertion, and
- * insert them in one heap_multi_insert() call, than call heap_insert()
- * separately for every tuple. However, there are a number of reasons why
- * we might not be able to do this. These are explained below.
+ * It's generally more efficient to prepare a bunch of tuples for
+ * insertion, and insert them in one table_multi_insert() call, than call
+ * table_insert() separately for every tuple. However, there are a number
+ * of reasons why we might not be able to do this. These are explained
+ * below.
*/
if (resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
* For partitioned tables we can't support multi-inserts when there
* are any statement level insert triggers. It might be possible to
* allow partitioned tables with such triggers in the future, but for
- * now, CopyFromInsertBatch expects that any before row insert and
- * statement level insert triggers are on the same relation.
+ * now, CopyMultiInsertInfoFlush expects that any before row insert
+ * and statement level insert triggers are on the same relation.
*/
insertMethod = CIM_SINGLE;
}
{
/*
* For partitioned tables, we may still be able to perform bulk
- * inserts for sets of consecutive tuples which belong to the same
- * partition. However, the possibility of this depends on which types
+ * inserts. However, the possibility of this depends on which types
* of triggers exist on the partition. We must disable bulk inserts
* if the partition is a foreign table or it has any before row insert
* or insert instead triggers (same as we checked above for the parent
* have the intermediate insert method of CIM_MULTI_CONDITIONAL to
* flag that we must later determine if we can use bulk-inserts for
* the partition being inserted into.
- *
- * Normally, when performing bulk inserts we just flush the insert
- * buffer whenever it becomes full, but for the partitioned table
- * case, we flush it whenever the current tuple does not belong to the
- * same partition as the previous tuple.
*/
if (proute)
insertMethod = CIM_MULTI_CONDITIONAL;
else
insertMethod = CIM_MULTI;
- bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+ CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
+ estate, mycid, ti_options);
+ }
+
+ /*
+ * If not using batch mode (which allocates slots as needed) set up a
+ * tuple slot too. When inserting into a partitioned table, we also need
+ * one, even if we might batch insert, to read the tuple in the root
+ * partition's form.
+ */
+ if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+ {
+ singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+ &estate->es_tupleTable);
+ bistate = GetBulkInsertState();
}
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
*/
ExecBSInsertTriggers(estate, resultRelInfo);
- values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
- nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
- bistate = GetBulkInsertState();
econtext = GetPerTupleExprContext(estate);
/* Set up callback to identify error line number */
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
- /*
- * Set up memory context for batches. For cases without batching we could
- * use the per-tuple context, but it's simpler to just use it every time.
- */
- batchcontext = AllocSetContextCreate(CurrentMemoryContext,
- "batch context",
- ALLOCSET_DEFAULT_SIZES);
-
for (;;)
{
- TupleTableSlot *slot;
+ TupleTableSlot *myslot;
bool skip_tuple;
CHECK_FOR_INTERRUPTS();
*/
ResetPerTupleExprContext(estate);
+ /* select slot to (initially) load row into */
+ if (insertMethod == CIM_SINGLE || proute)
+ {
+ myslot = singleslot;
+ Assert(myslot != NULL);
+ }
+ else
+ {
+ Assert(resultRelInfo == target_resultRelInfo);
+ Assert(insertMethod == CIM_MULTI);
+
+ myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+ resultRelInfo);
+ }
+
/*
* Switch to per-tuple context before calling NextCopyFrom, which does
* evaluate default expressions etc. and requires per-tuple context.
*/
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- if (!NextCopyFrom(cstate, econtext, values, nulls))
- break;
+ ExecClearTuple(myslot);
- /* Switch into per-batch memory context before forming the tuple. */
- MemoryContextSwitchTo(batchcontext);
+ /* Directly store the values/nulls array in the slot */
+ if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+ break;
- /* And now we can form the input tuple. */
- tuple = heap_form_tuple(tupDesc, values, nulls);
+ ExecStoreVirtualTuple(myslot);
/*
- * Constraints might reference the tableoid column, so (re-)initialize
- * tts_tableOid before evaluating them.
+ * Constraints and where clause might reference the tableoid column,
+ * so (re-)initialize tts_tableOid before evaluating them.
*/
myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);
- /* Place tuple in tuple slot --- but slot shouldn't free it */
- slot = myslot;
- ExecStoreHeapTuple(tuple, slot, false);
-
if (cstate->whereClause)
{
econtext->ecxt_scantuple = myslot;
+ /* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext))
continue;
}
- /* Determine the partition to heap_insert the tuple into */
+ /* Determine the partition to insert the tuple into */
if (proute)
{
TupleConversionMap *map;
* if the found partition is not suitable for INSERTs.
*/
resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
- proute, slot, estate);
+ proute, myslot, estate);
if (prevResultRelInfo != resultRelInfo)
{
- /* Check if we can multi-insert into this partition */
- if (insertMethod == CIM_MULTI_CONDITIONAL)
- {
- /*
- * When performing bulk-inserts into partitioned tables we
- * must insert the tuples seen so far to the heap whenever
- * the partition changes.
- */
- if (nBufferedTuples > 0)
- {
- MemoryContext oldcontext;
-
- CopyFromInsertBatch(cstate, estate, mycid, ti_options,
- prevResultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
- nBufferedTuples = 0;
- bufferedTuplesSize = 0;
-
- /*
- * The tuple is already allocated in the batch context, which
- * we want to reset. So to keep the tuple we copy it into the
- * short-lived (per-tuple) context, reset the batch context
- * and then copy it back into the per-batch one.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- tuple = heap_copytuple(tuple);
- MemoryContextSwitchTo(oldcontext);
-
- /* cleanup the old batch */
- MemoryContextReset(batchcontext);
-
- /* copy the tuple back to the per-batch context */
- oldcontext = MemoryContextSwitchTo(batchcontext);
- tuple = heap_copytuple(tuple);
- MemoryContextSwitchTo(oldcontext);
-
- /*
- * Also push the tuple copy to the slot (resetting the context
- * invalidated the slot contents).
- */
- ExecStoreHeapTuple(tuple, slot, false);
- }
-
- nPartitionChanges++;
-
- /*
- * Here we adaptively enable multi-inserts based on the
- * average number of tuples from recent multi-insert
- * batches. We recalculate the average every
- * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
- * the average over the whole copy. This allows us to
- * enable multi-inserts when we get periods in the copy
- * stream that have tuples commonly belonging to the same
- * partition, and disable when the partition is changing
- * too often.
- */
- if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
- RECHECK_MULTI_INSERT_THRESHOLD)
- && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
- {
- avgTuplesPerPartChange =
- (cstate->cur_lineno - lastPartitionSampleLineNo) /
- (double) nPartitionChanges;
-
- lastPartitionSampleLineNo = cstate->cur_lineno;
- nPartitionChanges = 0;
- }
- }
-
/* Determine which triggers exist on this partition */
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
/*
- * Tests have shown that using multi-inserts when the
- * partition changes on every tuple slightly decreases the
- * performance, however, there are benefits even when only
- * some batches have just 2 tuples, so let's enable
- * multi-inserts even when the average is quite low.
+ * Disable multi-inserts when the partition has BEFORE/INSTEAD
+ * OF triggers, or if the partition is a foreign partition.
*/
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
- avgTuplesPerPartChange >= 1.3 &&
!has_before_insert_row_trig &&
!has_instead_insert_row_trig &&
resultRelInfo->ri_FdwRoutine == NULL;
- /*
- * We'd better make the bulk insert mechanism gets a new
- * buffer when the partition being inserted into changes.
- */
- ReleaseBulkInsertStatePin(bistate);
+ /* Set the multi-insert buffer to use for this partition. */
+ if (leafpart_use_multi_insert)
+ {
+ if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
+ CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
+ resultRelInfo);
+ }
+ else if (insertMethod == CIM_MULTI_CONDITIONAL &&
+ !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+ {
+ /*
+ * Flush pending inserts if this partition can't use
+ * batching, so rows are visible to triggers etc.
+ */
+ CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+ }
+
+ if (bistate != NULL)
+ ReleaseBulkInsertStatePin(bistate);
prevResultRelInfo = resultRelInfo;
}
* rowtype.
*/
map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
- if (map != NULL)
+ if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
{
- TupleTableSlot *new_slot;
- MemoryContext oldcontext;
-
- new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
- Assert(new_slot != NULL);
-
- slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+ /* non batch insert */
+ if (map != NULL)
+ {
+ TupleTableSlot *new_slot;
+ new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+ myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+ }
+ }
+ else
+ {
/*
- * Get the tuple in the per-batch context, so that it will be
- * freed after each batch insert.
+ * Prepare to queue up tuple for later batch insert into
+ * current partition.
*/
- oldcontext = MemoryContextSwitchTo(batchcontext);
- tuple = ExecCopySlotHeapTuple(slot);
- MemoryContextSwitchTo(oldcontext);
+ TupleTableSlot *batchslot;
+
+ /* no other path available for partitioned table */
+ Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+ batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+ resultRelInfo);
+
+ if (map != NULL)
+ myslot = execute_attr_map_slot(map->attrMap, myslot,
+ batchslot);
+ else
+ {
+ /*
+ * This looks more expensive than it is (Believe me, I
+ * optimized it away. Twice.). The input is in virtual
+ * form, and we'll materialize the slot below - for most
+ * slot types the copy performs the work materialization
+ * would later require anyway.
+ */
+ ExecCopySlot(batchslot, myslot);
+ myslot = batchslot;
+ }
}
- slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ /* ensure that triggers etc see the right relation */
+ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (has_before_insert_row_trig)
{
- if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+ if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
skip_tuple = true; /* "do nothing" */
}
*/
if (has_instead_insert_row_trig)
{
- ExecIRInsertTriggers(estate, resultRelInfo, slot);
+ ExecIRInsertTriggers(estate, resultRelInfo, myslot);
}
else
{
*/
if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
- {
- ExecComputeStoredGenerated(estate, slot);
- MemoryContextSwitchTo(batchcontext);
- tuple = ExecCopySlotHeapTuple(slot);
- MemoryContextSwitchTo(oldcontext);
- }
+ ExecComputeStoredGenerated(estate, myslot);
/*
* If the target is a plain table, check the constraints of
*/
if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr)
- ExecConstraints(resultRelInfo, slot, estate);
+ ExecConstraints(resultRelInfo, myslot, estate);
/*
* Also check the tuple against the partition constraint, if
*/
if (resultRelInfo->ri_PartitionCheck &&
(proute == NULL || has_before_insert_row_trig))
- ExecPartitionCheck(resultRelInfo, slot, estate, true);
+ ExecPartitionCheck(resultRelInfo, myslot, estate, true);
- /*
- * Perform multi-inserts when enabled, or when loading a
- * partitioned table that can support multi-inserts as
- * determined above.
- */
+ /* Store the slot in the multi-insert buffer, when enabled. */
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
{
+ /*
+ * The slot previously might point into the per-tuple
+ * context. For batching it needs to be longer lived.
+ */
+ ExecMaterializeSlot(myslot);
+
/* Add this tuple to the tuple buffer */
- if (nBufferedTuples == 0)
- firstBufferedLineNo = cstate->cur_lineno;
- bufferedTuples[nBufferedTuples++] = tuple;
- bufferedTuplesSize += tuple->t_len;
+ CopyMultiInsertInfoStore(&multiInsertInfo,
+ resultRelInfo, myslot,
+ cstate->line_buf.len,
+ cstate->cur_lineno);
/*
- * If the buffer filled up, flush it. Also flush if the
- * total size of all the tuples in the buffer becomes
- * large, to avoid using large amounts of memory for the
- * buffer when the tuples are exceptionally wide.
+ * If enough inserts have queued up, then flush all
+ * buffers out to their tables.
*/
- if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
- bufferedTuplesSize > 65535)
- {
- CopyFromInsertBatch(cstate, estate, mycid, ti_options,
- resultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
- nBufferedTuples = 0;
- bufferedTuplesSize = 0;
-
- /* free memory occupied by tuples from the batch */
- MemoryContextReset(batchcontext);
- }
+ if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
+ CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
}
else
{
/* OK, store the tuple */
if (resultRelInfo->ri_FdwRoutine != NULL)
{
- slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
- resultRelInfo,
- slot,
- NULL);
+ myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+ resultRelInfo,
+ myslot,
+ NULL);
- if (slot == NULL) /* "do nothing" */
+ if (myslot == NULL) /* "do nothing" */
continue; /* next tuple please */
/*
* column, so (re-)initialize tts_tableOid before
* evaluating them.
*/
- slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
else
{
- tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
- heap_insert(resultRelInfo->ri_RelationDesc, tuple,
- mycid, ti_options, bistate);
- ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
- slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+ /* OK, store the tuple and create index entries for it */
+ table_insert(resultRelInfo->ri_RelationDesc, myslot,
+ mycid, ti_options, bistate);
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(myslot,
+ estate,
+ false,
+ NULL,
+ NIL);
}
- /* And create index entries for it */
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot,
- estate,
- false,
- NULL,
- NIL);
-
/* AFTER ROW INSERT Triggers */
- ExecARInsertTriggers(estate, resultRelInfo, slot,
+ ExecARInsertTriggers(estate, resultRelInfo, myslot,
recheckIndexes, cstate->transition_capture);
list_free(recheckIndexes);
}
/* Flush any remaining buffered tuples */
- if (nBufferedTuples > 0)
+ if (insertMethod != CIM_SINGLE)
{
- if (insertMethod == CIM_MULTI_CONDITIONAL)
- {
- CopyFromInsertBatch(cstate, estate, mycid, ti_options,
- prevResultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
- }
- else
- CopyFromInsertBatch(cstate, estate, mycid, ti_options,
- resultRelInfo, myslot, bistate,
- nBufferedTuples, bufferedTuples,
- firstBufferedLineNo);
+ if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+ CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+
+ /* Tear down the multi-insert buffer data */
+ CopyMultiInsertInfoCleanup(&multiInsertInfo);
}
/* Done, clean up */
error_context_stack = errcallback.previous;
- FreeBulkInsertState(bistate);
+ if (bistate != NULL)
+ FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(batchcontext);
-
/*
* In the old protocol, tell pqcomm that we can process normal protocol
* messages again.
/* Handle queued AFTER triggers */
AfterTriggerEndQuery(estate);
- pfree(values);
- pfree(nulls);
-
ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */
return processed;
}
-/*
- * A subroutine of CopyFrom, to write the current batch of buffered heap
- * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
- * triggers.
- */
-static void
-CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
- int ti_options, ResultRelInfo *resultRelInfo,
- TupleTableSlot *myslot, BulkInsertState bistate,
- int nBufferedTuples, HeapTuple *bufferedTuples,
- uint64 firstBufferedLineNo)
-{
- MemoryContext oldcontext;
- int i;
- uint64 save_cur_lineno;
- bool line_buf_valid = cstate->line_buf_valid;
-
- /*
- * Print error context information correctly, if one of the operations
- * below fail.
- */
- cstate->line_buf_valid = false;
- save_cur_lineno = cstate->cur_lineno;
-
- /*
- * heap_multi_insert leaks memory, so switch to short-lived memory context
- * before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- heap_multi_insert(resultRelInfo->ri_RelationDesc,
- bufferedTuples,
- nBufferedTuples,
- mycid,
- ti_options,
- bistate);
- MemoryContextSwitchTo(oldcontext);
-
- /*
- * If there are any indexes, update them for all the inserted tuples, and
- * run AFTER ROW INSERT triggers.
- */
- if (resultRelInfo->ri_NumIndices > 0)
- {
- for (i = 0; i < nBufferedTuples; i++)
- {
- List *recheckIndexes;
-
- cstate->cur_lineno = firstBufferedLineNo + i;
- ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
- recheckIndexes =
- ExecInsertIndexTuples(myslot,
- estate, false, NULL, NIL);
- ExecARInsertTriggers(estate, resultRelInfo,
- myslot,
- recheckIndexes, cstate->transition_capture);
- list_free(recheckIndexes);
- }
- }
-
- /*
- * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
- * anyway.
- */
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
- {
- for (i = 0; i < nBufferedTuples; i++)
- {
- cstate->cur_lineno = firstBufferedLineNo + i;
- ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
- ExecARInsertTriggers(estate, resultRelInfo,
- myslot,
- NIL, cstate->transition_capture);
- }
- }
-
- /* reset cur_lineno and line_buf_valid to what they were */
- cstate->line_buf_valid = line_buf_valid;
- cstate->cur_lineno = save_cur_lineno;
-}
-
/*
* Setup to read tuples from a file for COPY FROM.
*
DR_copy *myState = (DR_copy *) self;
CopyState cstate = myState->cstate;
- /* Make sure the tuple is fully deconstructed */
- slot_getallattrs(slot);
-
- /* And send the data */
- CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+ /* Send the data */
+ CopyOneRowTo(cstate, slot);
myState->processed++;
return true;