]> granicus.if.org Git - postgresql/commitdiff
tableam: Add table_multi_insert() and revamp/speed-up COPY FROM buffering.
authorAndres Freund <andres@anarazel.de>
Thu, 4 Apr 2019 22:47:19 +0000 (15:47 -0700)
committerAndres Freund <andres@anarazel.de>
Thu, 4 Apr 2019 23:28:18 +0000 (16:28 -0700)
This adds table_multi_insert(), and converts COPY FROM, the only user
of heap_multi_insert, to it.

A simple conversion of COPY FROM use slots would have yielded a
slowdown when inserting into a partitioned table for some
workloads. Different partitions might need different slots (both slot
types and their descriptors), and dropping / creating slots when
there's constant partition changes is measurable.

Thus instead revamp the COPY FROM buffering for partitioned tables to
allow to buffer inserts into multiple tables, flushing only when
limits are reached across all partition buffers. By only dropping
slots when there've been inserts into too many different partitions,
the aforementioned overhead is gone. By allowing larger batches, even
when there are frequent partition changes, we actuall speed such cases
up significantly.

By using slots COPY of very narrow rows into unlogged / temporary
might slow down very slightly (due to the indirect function calls).

Author: David Rowley, Andres Freund, Haribabu Kommi
Discussion:
    https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
    https://postgr.es/m/20190327054923.t3epfuewxfqdt22e@alap3.anarazel.de

src/backend/access/heap/heapam.c
src/backend/access/heap/heapam_handler.c
src/backend/commands/copy.c
src/backend/executor/execMain.c
src/backend/executor/execPartition.c
src/include/access/heapam.h
src/include/access/tableam.h
src/include/nodes/execnodes.h
src/tools/pgindent/typedefs.list

index 05ceb6550d5e0718449cdb0d639a5460b749fead..a05b6a07ad0d1cab51263befbdb386c6c84cdbdb 100644 (file)
@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
                                  CommandId cid, int options, BulkInsertState bistate)
 {
        TransactionId xid = GetCurrentTransactionId();
@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
        saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
                                                                                                   HEAP_DEFAULT_FILLFACTOR);
 
-       /* Toast and set header data in all the tuples */
+       /* Toast and set header data in all the slots */
        heaptuples = palloc(ntuples * sizeof(HeapTuple));
        for (i = 0; i < ntuples; i++)
-               heaptuples[i] = heap_prepare_insert(relation, tuples[i],
-                                                                                       xid, cid, options);
+       {
+               HeapTuple       tuple;
+
+               tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+               slots[i]->tts_tableOid = RelationGetRelid(relation);
+               tuple->t_tableOid = slots[i]->tts_tableOid;
+               heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+                                                                                       options);
+       }
 
        /*
         * We're about to do the actual inserts -- but check for conflict first,
@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
                        CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
        }
 
-       /*
-        * Copy t_self fields back to the caller's original tuples. This does
-        * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
-        * probably faster to always copy than check.
-        */
+       /* copy t_self fields back to the caller's slots */
        for (i = 0; i < ntuples; i++)
-               tuples[i]->t_self = heaptuples[i]->t_self;
+               slots[i]->tts_tid = heaptuples[i]->t_self;
 
        pgstat_count_heap_insert(relation, ntuples);
 }
index 6693d7eb2dd1f550f74ad9efb4e8add5b3c25a6f..add0d65f81655ed90357c7a67a2f398808d343ad 100644 (file)
@@ -2516,6 +2516,7 @@ static const TableAmRoutine heapam_methods = {
        .tuple_insert = heapam_tuple_insert,
        .tuple_insert_speculative = heapam_tuple_insert_speculative,
        .tuple_complete_speculative = heapam_tuple_complete_speculative,
+       .multi_insert = heap_multi_insert,
        .tuple_delete = heapam_tuple_delete,
        .tuple_update = heapam_tuple_update,
        .tuple_lock = heapam_tuple_lock,
index c1fd7b78ce9d1a5daf297fb2bd530f9193047300..3f63ba4caaca354ce9d19592acd6e5ea1dfebc4b 100644 (file)
@@ -90,9 +90,9 @@ typedef enum EolType
  */
 typedef enum CopyInsertMethod
 {
-       CIM_SINGLE,                                     /* use heap_insert or fdw routine */
-       CIM_MULTI,                                      /* always use heap_multi_insert */
-       CIM_MULTI_CONDITIONAL           /* use heap_multi_insert only if valid */
+       CIM_SINGLE,                                     /* use table_insert or fdw routine */
+       CIM_MULTI,                                      /* always use table_multi_insert */
+       CIM_MULTI_CONDITIONAL           /* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
 /*
@@ -236,6 +236,55 @@ typedef struct
 } DR_copy;
 
 
+/*
+ * No more than this many tuples per CopyMultiInsertBuffer
+ *
+ * Caution: Don't make this too big, as we could end up with this many
+ * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
+ * multiInsertBuffers list.  Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES            1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size, of tuples stored.
+ */
+#define MAX_BUFFERED_BYTES             65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS  32
+
+/* Stores multi-insert data related to a single relation in CopyFrom. */
+typedef struct CopyMultiInsertBuffer
+{
+       TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+       ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
+       BulkInsertState bistate;        /* BulkInsertState for this rel */
+       int                     nused;                  /* number of 'slots' containing tuples */
+       uint64          linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
+                                                                                                * stream */
+} CopyMultiInsertBuffer;
+
+/*
+ * Stores one or many CopyMultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them.  This allows multiple buffers to
+ * exist at once when COPYing into a partitioned table.
+ */
+typedef struct CopyMultiInsertInfo
+{
+       List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
+       int                     bufferedTuples; /* number of tuples buffered over all buffers */
+       int                     bufferedBytes;  /* number of bytes from all buffered tuples */
+       int                     nbuffers;               /* number of buffers we're tracking */
+       CopyState       cstate;                 /* Copy state for this CopyMultiInsertInfo */
+       EState     *estate;                     /* Executor state used for COPY */
+       CommandId       mycid;                  /* Command Id used for COPY */
+       int                     ti_options;             /* table insert options */
+} CopyMultiInsertInfo;
+
+
 /*
  * These macros centralize code used to process line_buf and raw_buf buffers.
  * They are macros because they often do continue/break control and to avoid
@@ -316,14 +365,7 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
-                        Datum *values, bool *nulls);
-static void CopyFromInsertBatch(CopyState cstate, EState *estate,
-                                       CommandId mycid, int ti_options,
-                                       ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
-                                       BulkInsertState bistate,
-                                       int nBufferedTuples, HeapTuple *bufferedTuples,
-                                       uint64 firstBufferedLineNo);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int     CopyReadAttributesText(CopyState cstate);
@@ -806,7 +848,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
        Relation        rel;
        Oid                     relid;
        RawStmt    *query = NULL;
-       Node    *whereClause = NULL;
+       Node       *whereClause = NULL;
 
        /*
         * Disallow COPY to/from file or program except to users with the
@@ -868,7 +910,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
                        /* Transform the raw expression tree */
                        whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
 
-                       /*  Make sure it yields a boolean result. */
+                       /* Make sure it yields a boolean result. */
                        whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
 
                        /* we have to fix its collations too */
@@ -2073,33 +2115,27 @@ CopyTo(CopyState cstate)
 
        if (cstate->rel)
        {
-               Datum      *values;
-               bool       *nulls;
+               TupleTableSlot *slot;
                TableScanDesc scandesc;
-               HeapTuple       tuple;
-
-               values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-               nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
 
                scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+               slot = table_slot_create(cstate->rel, NULL);
 
                processed = 0;
-               while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+               while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
                {
                        CHECK_FOR_INTERRUPTS();
 
-                       /* Deconstruct the tuple ... faster than repeated heap_getattr */
-                       heap_deform_tuple(tuple, tupDesc, values, nulls);
+                       /* Deconstruct the tuple ... */
+                       slot_getallattrs(slot);
 
                        /* Format and send the data */
-                       CopyOneRowTo(cstate, values, nulls);
+                       CopyOneRowTo(cstate, slot);
                        processed++;
                }
 
+               ExecDropSingleTupleTableSlot(slot);
                table_endscan(scandesc);
-
-               pfree(values);
-               pfree(nulls);
        }
        else
        {
@@ -2125,7 +2161,7 @@ CopyTo(CopyState cstate)
  * Emit one row during CopyTo().
  */
 static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
 {
        bool            need_delim = false;
        FmgrInfo   *out_functions = cstate->out_functions;
@@ -2142,11 +2178,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
                CopySendInt16(cstate, list_length(cstate->attnumlist));
        }
 
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
        foreach(cur, cstate->attnumlist)
        {
                int                     attnum = lfirst_int(cur);
-               Datum           value = values[attnum - 1];
-               bool            isnull = nulls[attnum - 1];
+               Datum           value = slot->tts_values[attnum - 1];
+               bool            isnull = slot->tts_isnull[attnum - 1];
 
                if (!cstate->binary)
                {
@@ -2305,47 +2344,337 @@ limit_printout_length(const char *str)
        return res;
 }
 
+/*
+ * Allocate memory and initialize a new CopyMultiInsertBuffer for this
+ * ResultRelInfo.
+ */
+static CopyMultiInsertBuffer *
+CopyMultiInsertBufferInit(ResultRelInfo *rri)
+{
+       CopyMultiInsertBuffer *buffer;
+
+       buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
+       memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+       buffer->resultRelInfo = rri;
+       buffer->bistate = GetBulkInsertState();
+       buffer->nused = 0;
+
+       return buffer;
+}
+
+/*
+ * Make a new buffer for this ResultRelInfo.
+ */
+static inline void
+CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
+                                                          ResultRelInfo *rri)
+{
+       CopyMultiInsertBuffer *buffer;
+
+       buffer = CopyMultiInsertBufferInit(rri);
+
+       /* Setup back-link so we can easily find this buffer again */
+       rri->ri_CopyMultiInsertBuffer = buffer;
+       /* Record that we're tracking this buffer */
+       miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * Initialize an already allocated CopyMultiInsertInfo.
+ *
+ * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
+ * for that table.
+ */
+static void
+CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+                                               CopyState cstate, EState *estate, CommandId mycid,
+                                               int ti_options)
+{
+       miinfo->multiInsertBuffers = NIL;
+       miinfo->bufferedTuples = 0;
+       miinfo->bufferedBytes = 0;
+       miinfo->nbuffers = 0;
+       miinfo->cstate = cstate;
+       miinfo->estate = estate;
+       miinfo->mycid = mycid;
+       miinfo->ti_options = ti_options;
+
+       /*
+        * Only setup the buffer when not dealing with a partitioned table.
+        * Buffers for partitioned tables will just be setup when we need to send
+        * tuples their way for the first time.
+        */
+       if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+               CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+}
+
+/*
+ * Returns true if the buffers are full
+ */
+static inline bool
+CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
+{
+       if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+               miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+               return true;
+       return false;
+}
+
+/*
+ * Returns true if we have no buffered tuples
+ */
+static inline bool
+CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
+{
+       return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
+                                                  CopyMultiInsertBuffer *buffer)
+{
+       MemoryContext oldcontext;
+       int                     i;
+       uint64          save_cur_lineno;
+       CopyState       cstate = miinfo->cstate;
+       EState     *estate = miinfo->estate;
+       CommandId       mycid = miinfo->mycid;
+       int                     ti_options = miinfo->ti_options;
+       bool            line_buf_valid = cstate->line_buf_valid;
+       int                     nused = buffer->nused;
+       ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+       TupleTableSlot **slots = buffer->slots;
+
+       /*
+        * Print error context information correctly, if one of the operations
+        * below fail.
+        */
+       cstate->line_buf_valid = false;
+       save_cur_lineno = cstate->cur_lineno;
+
+       /*
+        * table_multi_insert may leak memory, so switch to short-lived memory
+        * context before calling it.
+        */
+       oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+       table_multi_insert(resultRelInfo->ri_RelationDesc,
+                                          slots,
+                                          nused,
+                                          mycid,
+                                          ti_options,
+                                          buffer->bistate);
+       MemoryContextSwitchTo(oldcontext);
+
+       for (i = 0; i < nused; i++)
+       {
+               /*
+                * If there are any indexes, update them for all the inserted tuples,
+                * and run AFTER ROW INSERT triggers.
+                */
+               if (resultRelInfo->ri_NumIndices > 0)
+               {
+                       List       *recheckIndexes;
+
+                       cstate->cur_lineno = buffer->linenos[i];
+                       recheckIndexes =
+                               ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+                                                                         NIL);
+                       ExecARInsertTriggers(estate, resultRelInfo,
+                                                                slots[i], recheckIndexes,
+                                                                cstate->transition_capture);
+                       list_free(recheckIndexes);
+               }
+
+               /*
+                * There's no indexes, but see if we need to run AFTER ROW INSERT
+                * triggers anyway.
+                */
+               else if (resultRelInfo->ri_TrigDesc != NULL &&
+                                (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+                                 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+               {
+                       cstate->cur_lineno = buffer->linenos[i];
+                       ExecARInsertTriggers(estate, resultRelInfo,
+                                                                slots[i], NIL, cstate->transition_capture);
+               }
+
+               ExecClearTuple(slots[i]);
+       }
+
+       /* Mark that all slots are free */
+       buffer->nused = 0;
+
+       /* reset cur_lineno and line_buf_valid to what they were */
+       cstate->line_buf_valid = line_buf_valid;
+       cstate->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * Drop used slots and free member for this buffer.
+ *
+ * The buffer must be flushed before cleanup.
+ */
+static inline void
+CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer)
+{
+       int                     i;
+
+       /* Ensure buffer was flushed */
+       Assert(buffer->nused == 0);
+
+       /* Remove back-link to ourself */
+       buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+
+       FreeBulkInsertState(buffer->bistate);
+
+       /* Since we only create slots on demand, just drop the non-null ones. */
+       for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+               ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+       pfree(buffer);
+}
+
+/*
+ * Write out all stored tuples in all buffers out to the tables.
+ *
+ * Once flushed we also trim the tracked buffers list down to size by removing
+ * the buffers created earliest first.
+ *
+ * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
+ * used.  When cleaning up old buffers we'll never remove the one for
+ * 'curr_rri'.
+ */
+static inline void
+CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+{
+       ListCell   *lc;
+
+       foreach(lc, miinfo->multiInsertBuffers)
+       {
+               CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+
+               CopyMultiInsertBufferFlush(miinfo, buffer);
+       }
+
+       miinfo->bufferedTuples = 0;
+       miinfo->bufferedBytes = 0;
+
+       /*
+        * Trim the list of tracked buffers down if it exceeds the limit.  Here we
+        * remove buffers starting with the ones we created first.  It seems more
+        * likely that these older ones are less likely to be needed than ones
+        * that were just created.
+        */
+       while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+       {
+               CopyMultiInsertBuffer *buffer;
+
+               buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+
+               /*
+                * We never want to remove the buffer that's currently being used, so
+                * if we happen to find that then move it to the end of the list.
+                */
+               if (buffer->resultRelInfo == curr_rri)
+               {
+                       miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+                       miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+                       buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+               }
+
+               CopyMultiInsertBufferCleanup(buffer);
+               miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+       }
+}
+
+/*
+ * Cleanup allocated buffers and free memory
+ */
+static inline void
+CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
+{
+       ListCell   *lc;
+
+       foreach(lc, miinfo->multiInsertBuffers)
+               CopyMultiInsertBufferCleanup(lfirst(lc));
+
+       list_free(miinfo->multiInsertBuffers);
+}
+
+/*
+ * Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ */
+static inline TupleTableSlot *
+CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
+                                                               ResultRelInfo *rri)
+{
+       CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+       int                     nused = buffer->nused;
+
+       Assert(buffer != NULL);
+       Assert(nused < MAX_BUFFERED_TUPLES);
+
+       if (buffer->slots[nused] == NULL)
+               buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+       return buffer->slots[nused];
+}
+
+/*
+ * Record the previously reserved TupleTableSlot that was reserved by
+ * MultiInsertInfoNextFreeSlot as being consumed.
+ */
+static inline void
+CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+                                                TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+       CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+
+       Assert(buffer != NULL);
+       Assert(slot == buffer->slots[buffer->nused]);
+
+       /* Store the line number so we can properly report any errors later */
+       buffer->linenos[buffer->nused] = lineno;
+
+       /* Record this slot as being used */
+       buffer->nused++;
+
+       /* Update how many tuples are stored and their size */
+       miinfo->bufferedTuples++;
+       miinfo->bufferedBytes += tuplen;
+}
+
 /*
  * Copy FROM file to relation.
  */
 uint64
 CopyFrom(CopyState cstate)
 {
-       HeapTuple       tuple;
-       TupleDesc       tupDesc;
-       Datum      *values;
-       bool       *nulls;
        ResultRelInfo *resultRelInfo;
        ResultRelInfo *target_resultRelInfo;
        ResultRelInfo *prevResultRelInfo = NULL;
        EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
        ModifyTableState *mtstate;
        ExprContext *econtext;
-       TupleTableSlot *myslot;
+       TupleTableSlot *singleslot = NULL;
        MemoryContext oldcontext = CurrentMemoryContext;
-       MemoryContext batchcontext;
 
        PartitionTupleRouting *proute = NULL;
        ErrorContextCallback errcallback;
        CommandId       mycid = GetCurrentCommandId(true);
        int                     ti_options = 0; /* start with default table_insert options */
-       BulkInsertState bistate;
+       BulkInsertState bistate = NULL;
        CopyInsertMethod insertMethod;
+       CopyMultiInsertInfo multiInsertInfo = {0};      /* pacify compiler */
        uint64          processed = 0;
-       int                     nBufferedTuples = 0;
        bool            has_before_insert_row_trig;
        bool            has_instead_insert_row_trig;
        bool            leafpart_use_multi_insert = false;
 
-#define MAX_BUFFERED_TUPLES 1000
-#define RECHECK_MULTI_INSERT_THRESHOLD 1000
-       HeapTuple  *bufferedTuples = NULL;      /* initialize to silence warning */
-       Size            bufferedTuplesSize = 0;
-       uint64          firstBufferedLineNo = 0;
-       uint64          lastPartitionSampleLineNo = 0;
-       uint64          nPartitionChanges = 0;
-       double          avgTuplesPerPartChange = 0;
-
        Assert(cstate->rel);
 
        /*
@@ -2382,8 +2711,6 @@ CopyFrom(CopyState cstate)
                                                        RelationGetRelationName(cstate->rel))));
        }
 
-       tupDesc = RelationGetDescr(cstate->rel);
-
        /*----------
         * Check to see if we can avoid writing WAL
         *
@@ -2412,7 +2739,7 @@ CopyFrom(CopyState cstate)
         * FSM for free space is a waste of time, even if we must use WAL because
         * of archiving.  This could possibly be wrong, but it's unlikely.
         *
-        * The comments for heap_insert and RelationGetBufferForTuple specify that
+        * The comments for table_insert and RelationGetBufferForTuple specify that
         * skipping WAL logging is only safe if we ensure that our tuples do not
         * go into pages containing tuples from any other transactions --- but this
         * must be the case if we have a new table or new relfilenode, so we need
@@ -2467,8 +2794,8 @@ CopyFrom(CopyState cstate)
                if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
                {
                        ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                       errmsg("cannot perform FREEZE on a partitioned table")));
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("cannot perform FREEZE on a partitioned table")));
                }
 
                /*
@@ -2518,10 +2845,6 @@ CopyFrom(CopyState cstate)
 
        ExecInitRangeTable(estate, cstate->range_table);
 
-       /* Set up a tuple slot too */
-       myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-                                                                       &TTSOpsHeapTuple);
-
        /*
         * Set up a ModifyTableState so we can let FDW(s) init themselves for
         * foreign-table result relation(s).
@@ -2562,13 +2885,14 @@ CopyFrom(CopyState cstate)
 
        if (cstate->whereClause)
                cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
-                                                                                               &mtstate->ps);
+                                                                               &mtstate->ps);
 
        /*
-        * It's more efficient to prepare a bunch of tuples for insertion, and
-        * insert them in one heap_multi_insert() call, than call heap_insert()
-        * separately for every tuple. However, there are a number of reasons why
-        * we might not be able to do this.  These are explained below.
+        * It's generally more efficient to prepare a bunch of tuples for
+        * insertion, and insert them in one table_multi_insert() call, than call
+        * table_insert() separately for every tuple. However, there are a number
+        * of reasons why we might not be able to do this.  These are explained
+        * below.
         */
        if (resultRelInfo->ri_TrigDesc != NULL &&
                (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -2589,8 +2913,8 @@ CopyFrom(CopyState cstate)
                 * For partitioned tables we can't support multi-inserts when there
                 * are any statement level insert triggers. It might be possible to
                 * allow partitioned tables with such triggers in the future, but for
-                * now, CopyFromInsertBatch expects that any before row insert and
-                * statement level insert triggers are on the same relation.
+                * now, CopyMultiInsertInfoFlush expects that any before row insert
+                * and statement level insert triggers are on the same relation.
                 */
                insertMethod = CIM_SINGLE;
        }
@@ -2622,8 +2946,7 @@ CopyFrom(CopyState cstate)
        {
                /*
                 * For partitioned tables, we may still be able to perform bulk
-                * inserts for sets of consecutive tuples which belong to the same
-                * partition.  However, the possibility of this depends on which types
+                * inserts.  However, the possibility of this depends on which types
                 * of triggers exist on the partition.  We must disable bulk inserts
                 * if the partition is a foreign table or it has any before row insert
                 * or insert instead triggers (same as we checked above for the parent
@@ -2632,18 +2955,27 @@ CopyFrom(CopyState cstate)
                 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
                 * flag that we must later determine if we can use bulk-inserts for
                 * the partition being inserted into.
-                *
-                * Normally, when performing bulk inserts we just flush the insert
-                * buffer whenever it becomes full, but for the partitioned table
-                * case, we flush it whenever the current tuple does not belong to the
-                * same partition as the previous tuple.
                 */
                if (proute)
                        insertMethod = CIM_MULTI_CONDITIONAL;
                else
                        insertMethod = CIM_MULTI;
 
-               bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+               CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
+                                                               estate, mycid, ti_options);
+       }
+
+       /*
+        * If not using batch mode (which allocates slots as needed) set up a
+        * tuple slot too. When inserting into a partitioned table, we also need
+        * one, even if we might batch insert, to read the tuple in the root
+        * partition's form.
+        */
+       if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+       {
+               singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+                                                                          &estate->es_tupleTable);
+               bistate = GetBulkInsertState();
        }
 
        has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2660,10 +2992,6 @@ CopyFrom(CopyState cstate)
         */
        ExecBSInsertTriggers(estate, resultRelInfo);
 
-       values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-       nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
-       bistate = GetBulkInsertState();
        econtext = GetPerTupleExprContext(estate);
 
        /* Set up callback to identify error line number */
@@ -2672,17 +3000,9 @@ CopyFrom(CopyState cstate)
        errcallback.previous = error_context_stack;
        error_context_stack = &errcallback;
 
-       /*
-        * Set up memory context for batches. For cases without batching we could
-        * use the per-tuple context, but it's simpler to just use it every time.
-        */
-       batchcontext = AllocSetContextCreate(CurrentMemoryContext,
-                                                                                "batch context",
-                                                                                ALLOCSET_DEFAULT_SIZES);
-
        for (;;)
        {
-               TupleTableSlot *slot;
+               TupleTableSlot *myslot;
                bool            skip_tuple;
 
                CHECK_FOR_INTERRUPTS();
@@ -2693,42 +3013,53 @@ CopyFrom(CopyState cstate)
                 */
                ResetPerTupleExprContext(estate);
 
+               /* select slot to (initially) load row into */
+               if (insertMethod == CIM_SINGLE || proute)
+               {
+                       myslot = singleslot;
+                       Assert(myslot != NULL);
+               }
+               else
+               {
+                       Assert(resultRelInfo == target_resultRelInfo);
+                       Assert(insertMethod == CIM_MULTI);
+
+                       myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+                                                                                                        resultRelInfo);
+               }
+
                /*
                 * Switch to per-tuple context before calling NextCopyFrom, which does
                 * evaluate default expressions etc. and requires per-tuple context.
                 */
                MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-               if (!NextCopyFrom(cstate, econtext, values, nulls))
-                       break;
+               ExecClearTuple(myslot);
 
-               /* Switch into per-batch memory context before forming the tuple. */
-               MemoryContextSwitchTo(batchcontext);
+               /* Directly store the values/nulls array in the slot */
+               if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+                       break;
 
-               /* And now we can form the input tuple. */
-               tuple = heap_form_tuple(tupDesc, values, nulls);
+               ExecStoreVirtualTuple(myslot);
 
                /*
-                * Constraints might reference the tableoid column, so (re-)initialize
-                * tts_tableOid before evaluating them.
+                * Constraints and where clause might reference the tableoid column,
+                * so (re-)initialize tts_tableOid before evaluating them.
                 */
                myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
 
                /* Triggers and stuff need to be invoked in query context. */
                MemoryContextSwitchTo(oldcontext);
 
-               /* Place tuple in tuple slot --- but slot shouldn't free it */
-               slot = myslot;
-               ExecStoreHeapTuple(tuple, slot, false);
-
                if (cstate->whereClause)
                {
                        econtext->ecxt_scantuple = myslot;
+                       /* Skip items that don't match COPY's WHERE clause */
                        if (!ExecQual(cstate->qualexpr, econtext))
                                continue;
                }
 
-               /* Determine the partition to heap_insert the tuple into */
+               /* Determine the partition to insert the tuple into */
                if (proute)
                {
                        TupleConversionMap *map;
@@ -2739,80 +3070,10 @@ CopyFrom(CopyState cstate)
                         * if the found partition is not suitable for INSERTs.
                         */
                        resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
-                                                                                         proute, slot, estate);
+                                                                                         proute, myslot, estate);
 
                        if (prevResultRelInfo != resultRelInfo)
                        {
-                               /* Check if we can multi-insert into this partition */
-                               if (insertMethod == CIM_MULTI_CONDITIONAL)
-                               {
-                                       /*
-                                        * When performing bulk-inserts into partitioned tables we
-                                        * must insert the tuples seen so far to the heap whenever
-                                        * the partition changes.
-                                        */
-                                       if (nBufferedTuples > 0)
-                                       {
-                                               MemoryContext   oldcontext;
-
-                                               CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-                                                                                       prevResultRelInfo, myslot, bistate,
-                                                                                       nBufferedTuples, bufferedTuples,
-                                                                                       firstBufferedLineNo);
-                                               nBufferedTuples = 0;
-                                               bufferedTuplesSize = 0;
-
-                                               /*
-                                                * The tuple is already allocated in the batch context, which
-                                                * we want to reset.  So to keep the tuple we copy it into the
-                                                * short-lived (per-tuple) context, reset the batch context
-                                                * and then copy it back into the per-batch one.
-                                                */
-                                               oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-                                               tuple = heap_copytuple(tuple);
-                                               MemoryContextSwitchTo(oldcontext);
-
-                                               /* cleanup the old batch */
-                                               MemoryContextReset(batchcontext);
-
-                                               /* copy the tuple back to the per-batch context */
-                                               oldcontext = MemoryContextSwitchTo(batchcontext);
-                                               tuple = heap_copytuple(tuple);
-                                               MemoryContextSwitchTo(oldcontext);
-
-                                               /*
-                                                * Also push the tuple copy to the slot (resetting the context
-                                                * invalidated the slot contents).
-                                                */
-                                               ExecStoreHeapTuple(tuple, slot, false);
-                                       }
-
-                                       nPartitionChanges++;
-
-                                       /*
-                                        * Here we adaptively enable multi-inserts based on the
-                                        * average number of tuples from recent multi-insert
-                                        * batches.  We recalculate the average every
-                                        * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
-                                        * the average over the whole copy.  This allows us to
-                                        * enable multi-inserts when we get periods in the copy
-                                        * stream that have tuples commonly belonging to the same
-                                        * partition, and disable when the partition is changing
-                                        * too often.
-                                        */
-                                       if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
-                                                                                                                          RECHECK_MULTI_INSERT_THRESHOLD)
-                                                                && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
-                                       {
-                                               avgTuplesPerPartChange =
-                                                       (cstate->cur_lineno - lastPartitionSampleLineNo) /
-                                                       (double) nPartitionChanges;
-
-                                               lastPartitionSampleLineNo = cstate->cur_lineno;
-                                               nPartitionChanges = 0;
-                                       }
-                               }
-
                                /* Determine which triggers exist on this partition */
                                has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
                                                                                          resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -2821,23 +3082,33 @@ CopyFrom(CopyState cstate)
                                                                                           resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
 
                                /*
-                                * Tests have shown that using multi-inserts when the
-                                * partition changes on every tuple slightly decreases the
-                                * performance, however, there are benefits even when only
-                                * some batches have just 2 tuples, so let's enable
-                                * multi-inserts even when the average is quite low.
+                                * Disable multi-inserts when the partition has BEFORE/INSTEAD
+                                * OF triggers, or if the partition is a foreign partition.
                                 */
                                leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
-                                       avgTuplesPerPartChange >= 1.3 &&
                                        !has_before_insert_row_trig &&
                                        !has_instead_insert_row_trig &&
                                        resultRelInfo->ri_FdwRoutine == NULL;
 
-                               /*
-                                * We'd better make the bulk insert mechanism gets a new
-                                * buffer when the partition being inserted into changes.
-                                */
-                               ReleaseBulkInsertStatePin(bistate);
+                               /* Set the multi-insert buffer to use for this partition. */
+                               if (leafpart_use_multi_insert)
+                               {
+                                       if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
+                                               CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
+                                                                                                          resultRelInfo);
+                               }
+                               else if (insertMethod == CIM_MULTI_CONDITIONAL &&
+                                                !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+                               {
+                                       /*
+                                        * Flush pending inserts if this partition can't use
+                                        * batching, so rows are visible to triggers etc.
+                                        */
+                                       CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+                               }
+
+                               if (bistate != NULL)
+                                       ReleaseBulkInsertStatePin(bistate);
                                prevResultRelInfo = resultRelInfo;
                        }
 
@@ -2879,26 +3150,50 @@ CopyFrom(CopyState cstate)
                         * rowtype.
                         */
                        map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
-                       if (map != NULL)
+                       if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
                        {
-                               TupleTableSlot *new_slot;
-                               MemoryContext oldcontext;
-
-                               new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
-                               Assert(new_slot != NULL);
-
-                               slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+                               /* non batch insert */
+                               if (map != NULL)
+                               {
+                                       TupleTableSlot *new_slot;
 
+                                       new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+                                       myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+                               }
+                       }
+                       else
+                       {
                                /*
-                                * Get the tuple in the per-batch context, so that it will be
-                                * freed after each batch insert.
+                                * Prepare to queue up tuple for later batch insert into
+                                * current partition.
                                 */
-                               oldcontext = MemoryContextSwitchTo(batchcontext);
-                               tuple = ExecCopySlotHeapTuple(slot);
-                               MemoryContextSwitchTo(oldcontext);
+                               TupleTableSlot *batchslot;
+
+                               /* no other path available for partitioned table */
+                               Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+                               batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+                                                                                                                       resultRelInfo);
+
+                               if (map != NULL)
+                                       myslot = execute_attr_map_slot(map->attrMap, myslot,
+                                                                                                  batchslot);
+                               else
+                               {
+                                       /*
+                                        * This looks more expensive than it is (Believe me, I
+                                        * optimized it away. Twice.). The input is in virtual
+                                        * form, and we'll materialize the slot below - for most
+                                        * slot types the copy performs the work materialization
+                                        * would later require anyway.
+                                        */
+                                       ExecCopySlot(batchslot, myslot);
+                                       myslot = batchslot;
+                               }
                        }
 
-                       slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+                       /* ensure that triggers etc see the right relation  */
+                       myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
                }
 
                skip_tuple = false;
@@ -2906,7 +3201,7 @@ CopyFrom(CopyState cstate)
                /* BEFORE ROW INSERT Triggers */
                if (has_before_insert_row_trig)
                {
-                       if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+                       if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
                                skip_tuple = true;      /* "do nothing" */
                }
 
@@ -2919,7 +3214,7 @@ CopyFrom(CopyState cstate)
                         */
                        if (has_instead_insert_row_trig)
                        {
-                               ExecIRInsertTriggers(estate, resultRelInfo, slot);
+                               ExecIRInsertTriggers(estate, resultRelInfo, myslot);
                        }
                        else
                        {
@@ -2931,12 +3226,7 @@ CopyFrom(CopyState cstate)
                                 */
                                if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
                                        resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
-                               {
-                                       ExecComputeStoredGenerated(estate, slot);
-                                       MemoryContextSwitchTo(batchcontext);
-                                       tuple = ExecCopySlotHeapTuple(slot);
-                                       MemoryContextSwitchTo(oldcontext);
-                               }
+                                       ExecComputeStoredGenerated(estate, myslot);
 
                                /*
                                 * If the target is a plain table, check the constraints of
@@ -2944,7 +3234,7 @@ CopyFrom(CopyState cstate)
                                 */
                                if (resultRelInfo->ri_FdwRoutine == NULL &&
                                        resultRelInfo->ri_RelationDesc->rd_att->constr)
-                                       ExecConstraints(resultRelInfo, slot, estate);
+                                       ExecConstraints(resultRelInfo, myslot, estate);
 
                                /*
                                 * Also check the tuple against the partition constraint, if
@@ -2954,40 +3244,29 @@ CopyFrom(CopyState cstate)
                                 */
                                if (resultRelInfo->ri_PartitionCheck &&
                                        (proute == NULL || has_before_insert_row_trig))
-                                       ExecPartitionCheck(resultRelInfo, slot, estate, true);
+                                       ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
-                               /*
-                                * Perform multi-inserts when enabled, or when loading a
-                                * partitioned table that can support multi-inserts as
-                                * determined above.
-                                */
+                               /* Store the slot in the multi-insert buffer, when enabled. */
                                if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
                                {
+                                       /*
+                                        * The slot previously might point into the per-tuple
+                                        * context. For batching it needs to be longer lived.
+                                        */
+                                       ExecMaterializeSlot(myslot);
+
                                        /* Add this tuple to the tuple buffer */
-                                       if (nBufferedTuples == 0)
-                                               firstBufferedLineNo = cstate->cur_lineno;
-                                       bufferedTuples[nBufferedTuples++] = tuple;
-                                       bufferedTuplesSize += tuple->t_len;
+                                       CopyMultiInsertInfoStore(&multiInsertInfo,
+                                                                                        resultRelInfo, myslot,
+                                                                                        cstate->line_buf.len,
+                                                                                        cstate->cur_lineno);
 
                                        /*
-                                        * If the buffer filled up, flush it.  Also flush if the
-                                        * total size of all the tuples in the buffer becomes
-                                        * large, to avoid using large amounts of memory for the
-                                        * buffer when the tuples are exceptionally wide.
+                                        * If enough inserts have queued up, then flush all
+                                        * buffers out to their tables.
                                         */
-                                       if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-                                               bufferedTuplesSize > 65535)
-                                       {
-                                               CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-                                                                                       resultRelInfo, myslot, bistate,
-                                                                                       nBufferedTuples, bufferedTuples,
-                                                                                       firstBufferedLineNo);
-                                               nBufferedTuples = 0;
-                                               bufferedTuplesSize = 0;
-
-                                               /* free memory occupied by tuples from the batch */
-                                               MemoryContextReset(batchcontext);
-                                       }
+                                       if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
+                                               CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
                                }
                                else
                                {
@@ -2996,12 +3275,12 @@ CopyFrom(CopyState cstate)
                                        /* OK, store the tuple */
                                        if (resultRelInfo->ri_FdwRoutine != NULL)
                                        {
-                                               slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
-                                                                                                                                                          resultRelInfo,
-                                                                                                                                                          slot,
-                                                                                                                                                          NULL);
+                                               myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+                                                                                                                                                                resultRelInfo,
+                                                                                                                                                                myslot,
+                                                                                                                                                                NULL);
 
-                                               if (slot == NULL)       /* "do nothing" */
+                                               if (myslot == NULL) /* "do nothing" */
                                                        continue;       /* next tuple please */
 
                                                /*
@@ -3009,27 +3288,24 @@ CopyFrom(CopyState cstate)
                                                 * column, so (re-)initialize tts_tableOid before
                                                 * evaluating them.
                                                 */
-                                               slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+                                               myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
                                        }
                                        else
                                        {
-                                               tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-                                               heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-                                                                       mycid, ti_options, bistate);
-                                               ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-                                               slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+                                               /* OK, store the tuple and create index entries for it */
+                                               table_insert(resultRelInfo->ri_RelationDesc, myslot,
+                                                                        mycid, ti_options, bistate);
+
+                                               if (resultRelInfo->ri_NumIndices > 0)
+                                                       recheckIndexes = ExecInsertIndexTuples(myslot,
+                                                                                                                                  estate,
+                                                                                                                                  false,
+                                                                                                                                  NULL,
+                                                                                                                                  NIL);
                                        }
 
-                                       /* And create index entries for it */
-                                       if (resultRelInfo->ri_NumIndices > 0)
-                                               recheckIndexes = ExecInsertIndexTuples(slot,
-                                                                                                                          estate,
-                                                                                                                          false,
-                                                                                                                          NULL,
-                                                                                                                          NIL);
-
                                        /* AFTER ROW INSERT Triggers */
-                                       ExecARInsertTriggers(estate, resultRelInfo, slot,
+                                       ExecARInsertTriggers(estate, resultRelInfo, myslot,
                                                                                 recheckIndexes, cstate->transition_capture);
 
                                        list_free(recheckIndexes);
@@ -3046,31 +3322,23 @@ CopyFrom(CopyState cstate)
        }
 
        /* Flush any remaining buffered tuples */
-       if (nBufferedTuples > 0)
+       if (insertMethod != CIM_SINGLE)
        {
-               if (insertMethod == CIM_MULTI_CONDITIONAL)
-               {
-                       CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-                                                               prevResultRelInfo, myslot, bistate,
-                                                               nBufferedTuples, bufferedTuples,
-                                                               firstBufferedLineNo);
-               }
-               else
-                       CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-                                                               resultRelInfo, myslot, bistate,
-                                                               nBufferedTuples, bufferedTuples,
-                                                               firstBufferedLineNo);
+               if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+                       CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+
+               /* Tear down the multi-insert buffer data */
+               CopyMultiInsertInfoCleanup(&multiInsertInfo);
        }
 
        /* Done, clean up */
        error_context_stack = errcallback.previous;
 
-       FreeBulkInsertState(bistate);
+       if (bistate != NULL)
+               FreeBulkInsertState(bistate);
 
        MemoryContextSwitchTo(oldcontext);
 
-       MemoryContextDelete(batchcontext);
-
        /*
         * In the old protocol, tell pqcomm that we can process normal protocol
         * messages again.
@@ -3084,9 +3352,6 @@ CopyFrom(CopyState cstate)
        /* Handle queued AFTER triggers */
        AfterTriggerEndQuery(estate);
 
-       pfree(values);
-       pfree(nulls);
-
        ExecResetTupleTable(estate->es_tupleTable, false);
 
        /* Allow the FDW to shut down */
@@ -3111,88 +3376,6 @@ CopyFrom(CopyState cstate)
        return processed;
 }
 
-/*
- * A subroutine of CopyFrom, to write the current batch of buffered heap
- * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
- * triggers.
- */
-static void
-CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
-                                       int ti_options, ResultRelInfo *resultRelInfo,
-                                       TupleTableSlot *myslot, BulkInsertState bistate,
-                                       int nBufferedTuples, HeapTuple *bufferedTuples,
-                                       uint64 firstBufferedLineNo)
-{
-       MemoryContext oldcontext;
-       int                     i;
-       uint64          save_cur_lineno;
-       bool            line_buf_valid = cstate->line_buf_valid;
-
-       /*
-        * Print error context information correctly, if one of the operations
-        * below fail.
-        */
-       cstate->line_buf_valid = false;
-       save_cur_lineno = cstate->cur_lineno;
-
-       /*
-        * heap_multi_insert leaks memory, so switch to short-lived memory context
-        * before calling it.
-        */
-       oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-       heap_multi_insert(resultRelInfo->ri_RelationDesc,
-                                         bufferedTuples,
-                                         nBufferedTuples,
-                                         mycid,
-                                         ti_options,
-                                         bistate);
-       MemoryContextSwitchTo(oldcontext);
-
-       /*
-        * If there are any indexes, update them for all the inserted tuples, and
-        * run AFTER ROW INSERT triggers.
-        */
-       if (resultRelInfo->ri_NumIndices > 0)
-       {
-               for (i = 0; i < nBufferedTuples; i++)
-               {
-                       List       *recheckIndexes;
-
-                       cstate->cur_lineno = firstBufferedLineNo + i;
-                       ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
-                       recheckIndexes =
-                               ExecInsertIndexTuples(myslot,
-                                                                         estate, false, NULL, NIL);
-                       ExecARInsertTriggers(estate, resultRelInfo,
-                                                                myslot,
-                                                                recheckIndexes, cstate->transition_capture);
-                       list_free(recheckIndexes);
-               }
-       }
-
-       /*
-        * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
-        * anyway.
-        */
-       else if (resultRelInfo->ri_TrigDesc != NULL &&
-                        (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-                         resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-       {
-               for (i = 0; i < nBufferedTuples; i++)
-               {
-                       cstate->cur_lineno = firstBufferedLineNo + i;
-                       ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
-                       ExecARInsertTriggers(estate, resultRelInfo,
-                                                                myslot,
-                                                                NIL, cstate->transition_capture);
-               }
-       }
-
-       /* reset cur_lineno and line_buf_valid to what they were */
-       cstate->line_buf_valid = line_buf_valid;
-       cstate->cur_lineno = save_cur_lineno;
-}
-
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -4990,11 +5173,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
        DR_copy    *myState = (DR_copy *) self;
        CopyState       cstate = myState->cstate;
 
-       /* Make sure the tuple is fully deconstructed */
-       slot_getallattrs(slot);
-
-       /* And send the data */
-       CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+       /* Send the data */
+       CopyOneRowTo(cstate, slot);
        myState->processed++;
 
        return true;
index 03dcc7b820b351eb857ef59aa92cce2d1d34574b..602a08e5858151b50778f1ae0a7338df7c3dc65e 100644 (file)
@@ -1346,6 +1346,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_PartitionCheck = partition_check;
        resultRelInfo->ri_PartitionRoot = partition_root;
        resultRelInfo->ri_PartitionInfo = NULL; /* may be set later */
+       resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
 }
 
 /*
index b72db85aab4a5f04a2d249dd17fb40b850fa4b24..50800aa5ca53ef8bb1516d338c93404ae5bca59a 100644 (file)
@@ -947,6 +947,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
                partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
 
        partRelInfo->ri_PartitionInfo = partrouteinfo;
+       partRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
        /*
         * Keep track of it in the PartitionTupleRouting->partitions array.
index 4c077755d5412f424d43aad1cb86a3557d6dc3ab..77e5e603b032e4644fb3271f5202461288fe4ccb 100644 (file)
@@ -36,6 +36,7 @@
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
 typedef struct BulkInsertStateData *BulkInsertState;
+struct TupleTableSlot;
 
 #define MaxLockTupleMode       LockTupleExclusive
 
@@ -143,8 +144,9 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
                        int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
-                                 CommandId cid, int options, BulkInsertState bistate);
+extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
+                                 int ntuples, CommandId cid, int options,
+                                 BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
                        CommandId cid, Snapshot crosscheck, bool wait,
                        struct TM_FailureData *tmfd, bool changingPart);
index 42e2ba68bf91db7a738f97832ffb7192fbd03fb2..90c329a88d3725a6e9234d7be3b25934e33e3567 100644 (file)
@@ -350,6 +350,10 @@ typedef struct TableAmRoutine
                                                                                           uint32 specToken,
                                                                                           bool succeeded);
 
+       /* see table_multi_insert() for reference about parameters */
+       void            (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
+                                                                CommandId cid, int options, struct BulkInsertStateData *bistate);
+
        /* see table_delete() for reference about parameters */
        TM_Result       (*tuple_delete) (Relation rel,
                                                                 ItemPointer tid,
@@ -1077,6 +1081,28 @@ table_complete_speculative(Relation rel, TupleTableSlot *slot,
                                                                                                succeeded);
 }
 
+/*
+ * Insert multiple tuple into a table.
+ *
+ * This is like table_insert(), but inserts multiple tuples in one
+ * operation. That's often faster than calling table_insert() in a loop,
+ * because e.g. the AM can reduce WAL logging and page locking overhead.
+ *
+ * Except for taking `nslots` tuples as input, as an array of TupleTableSlots
+ * in `slots`, the parameters for table_multi_insert() are the same as for
+ * table_insert().
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+static inline void
+table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+                                  CommandId cid, int options, struct BulkInsertStateData *bistate)
+{
+       rel->rd_tableam->multi_insert(rel, slots, nslots,
+                                                                 cid, options, bistate);
+}
+
 /*
  * Delete a tuple.
  *
index 5b4ea6c23552b1bca4990c6586a106aea46f1e5b..a5e4b7ef2e03e90300157133198899012ad5fc23 100644 (file)
@@ -40,6 +40,7 @@ struct ExprState;
 struct ExprContext;
 struct RangeTblEntry;                  /* avoid including parsenodes.h here */
 struct ExprEvalStep;                   /* avoid including execExpr.h everywhere */
+struct CopyMultiInsertBuffer;
 
 
 /* ----------------
@@ -481,6 +482,9 @@ typedef struct ResultRelInfo
 
        /* Additional information specific to partition tuple routing */
        struct PartitionRoutingInfo *ri_PartitionInfo;
+
+       /* For use by copy.c when performing multi-inserts */
+       struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
 } ResultRelInfo;
 
 /* ----------------
index e09f9353ed2618aeae60affe60323b0989d1fb68..c6050a3f774a292fa3f17cec943b8d5af980b944 100644 (file)
@@ -402,6 +402,9 @@ ConversionLocation
 ConvertRowtypeExpr
 CookedConstraint
 CopyDest
+CopyInsertMethod
+CopyMultiInsertBuffer
+CopyMultiInsertInfo
 CopyState
 CopyStateData
 CopyStmt