]> granicus.if.org Git - postgresql/commitdiff
tableam: Add table_finish_bulk_insert().
authorAndres Freund <andres@anarazel.de>
Mon, 1 Apr 2019 21:41:42 +0000 (14:41 -0700)
committerAndres Freund <andres@anarazel.de>
Mon, 1 Apr 2019 21:41:42 +0000 (14:41 -0700)
This replaces the previous calls of heap_sync() in places using
bulk-insert. By passing in the flags used for bulk-insert the AM can
decide (first at insert time and then during the finish call) which of
the optimizations apply to it, and what operations are necessary to
finish a bulk insert operation.

Also change HEAP_INSERT_* flags to TABLE_INSERT, and rename hi_options
to ti_options.

These changes are made even in copy.c, which hasn't yet been converted
to tableam. There's no harm in doing so.

Author: Andres Freund
Discussion: https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de

src/backend/access/heap/heapam_handler.c
src/backend/commands/copy.c
src/backend/commands/createas.c
src/backend/commands/matview.c
src/backend/commands/tablecmds.c
src/include/access/tableam.h

index ce54b16f34259cdf7925c146b00097c6e3daf74d..5c96fc91b796390408771d6f473c5f7b8d3920fa 100644 (file)
@@ -540,6 +540,17 @@ tuple_lock_retry:
        return result;
 }
 
+static void
+heapam_finish_bulk_insert(Relation relation, int options)
+{
+       /*
+        * If we skipped writing WAL, then we need to sync the heap (but not
+        * indexes since those use WAL anyway / don't go through tableam)
+        */
+       if (options & HEAP_INSERT_SKIP_WAL)
+               heap_sync(relation);
+}
+
 
 /* ------------------------------------------------------------------------
  * DDL related callbacks for heap AM.
@@ -2401,6 +2412,7 @@ static const TableAmRoutine heapam_methods = {
        .tuple_delete = heapam_tuple_delete,
        .tuple_update = heapam_tuple_update,
        .tuple_lock = heapam_tuple_lock,
+       .finish_bulk_insert = heapam_finish_bulk_insert,
 
        .tuple_fetch_row_version = heapam_fetch_row_version,
        .tuple_get_latest_tid = heap_get_latest_tid,
index 27d3a012afa0611651f7b96d16bdef2670ac7c08..c1fd7b78ce9d1a5daf297fb2bd530f9193047300 100644 (file)
@@ -319,7 +319,7 @@ static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate,
                         Datum *values, bool *nulls);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
-                                       CommandId mycid, int hi_options,
+                                       CommandId mycid, int ti_options,
                                        ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
                                        BulkInsertState bistate,
                                        int nBufferedTuples, HeapTuple *bufferedTuples,
@@ -2328,7 +2328,7 @@ CopyFrom(CopyState cstate)
        PartitionTupleRouting *proute = NULL;
        ErrorContextCallback errcallback;
        CommandId       mycid = GetCurrentCommandId(true);
-       int                     hi_options = 0; /* start with default heap_insert options */
+       int                     ti_options = 0; /* start with default table_insert options */
        BulkInsertState bistate;
        CopyInsertMethod insertMethod;
        uint64          processed = 0;
@@ -2392,8 +2392,8 @@ CopyFrom(CopyState cstate)
         *      - data is being written to relfilenode created in this transaction
         * then we can skip writing WAL.  It's safe because if the transaction
         * doesn't commit, we'll discard the table (or the new relfilenode file).
-        * If it does commit, we'll have done the heap_sync at the bottom of this
-        * routine first.
+        * If it does commit, we'll have done the table_finish_bulk_insert() at
+        * the bottom of this routine first.
         *
         * As mentioned in comments in utils/rel.h, the in-same-transaction test
         * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
@@ -2437,9 +2437,9 @@ CopyFrom(CopyState cstate)
                (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
                 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
        {
-               hi_options |= HEAP_INSERT_SKIP_FSM;
+               ti_options |= TABLE_INSERT_SKIP_FSM;
                if (!XLogIsNeeded())
-                       hi_options |= HEAP_INSERT_SKIP_WAL;
+                       ti_options |= TABLE_INSERT_SKIP_WAL;
        }
 
        /*
@@ -2491,7 +2491,7 @@ CopyFrom(CopyState cstate)
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
 
-               hi_options |= HEAP_INSERT_FROZEN;
+               ti_options |= TABLE_INSERT_FROZEN;
        }
 
        /*
@@ -2755,7 +2755,7 @@ CopyFrom(CopyState cstate)
                                        {
                                                MemoryContext   oldcontext;
 
-                                               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                               CopyFromInsertBatch(cstate, estate, mycid, ti_options,
                                                                                        prevResultRelInfo, myslot, bistate,
                                                                                        nBufferedTuples, bufferedTuples,
                                                                                        firstBufferedLineNo);
@@ -2978,7 +2978,7 @@ CopyFrom(CopyState cstate)
                                        if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
                                                bufferedTuplesSize > 65535)
                                        {
-                                               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                               CopyFromInsertBatch(cstate, estate, mycid, ti_options,
                                                                                        resultRelInfo, myslot, bistate,
                                                                                        nBufferedTuples, bufferedTuples,
                                                                                        firstBufferedLineNo);
@@ -3015,7 +3015,7 @@ CopyFrom(CopyState cstate)
                                        {
                                                tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
                                                heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-                                                                       mycid, hi_options, bistate);
+                                                                       mycid, ti_options, bistate);
                                                ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
                                                slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
                                        }
@@ -3050,13 +3050,13 @@ CopyFrom(CopyState cstate)
        {
                if (insertMethod == CIM_MULTI_CONDITIONAL)
                {
-                       CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                       CopyFromInsertBatch(cstate, estate, mycid, ti_options,
                                                                prevResultRelInfo, myslot, bistate,
                                                                nBufferedTuples, bufferedTuples,
                                                                firstBufferedLineNo);
                }
                else
-                       CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                       CopyFromInsertBatch(cstate, estate, mycid, ti_options,
                                                                resultRelInfo, myslot, bistate,
                                                                nBufferedTuples, bufferedTuples,
                                                                firstBufferedLineNo);
@@ -3106,12 +3106,7 @@ CopyFrom(CopyState cstate)
 
        FreeExecutorState(estate);
 
-       /*
-        * If we skipped writing WAL, then we need to sync the heap (but not
-        * indexes since those use WAL anyway)
-        */
-       if (hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(cstate->rel);
+       table_finish_bulk_insert(cstate->rel, ti_options);
 
        return processed;
 }
@@ -3123,7 +3118,7 @@ CopyFrom(CopyState cstate)
  */
 static void
 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
-                                       int hi_options, ResultRelInfo *resultRelInfo,
+                                       int ti_options, ResultRelInfo *resultRelInfo,
                                        TupleTableSlot *myslot, BulkInsertState bistate,
                                        int nBufferedTuples, HeapTuple *bufferedTuples,
                                        uint64 firstBufferedLineNo)
@@ -3149,7 +3144,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
                                          bufferedTuples,
                                          nBufferedTuples,
                                          mycid,
-                                         hi_options,
+                                         ti_options,
                                          bistate);
        MemoryContextSwitchTo(oldcontext);
 
index 3bdb67c697cb368c5b05a3df33d41a0b25f5a4b6..43c2fa91242eea6a2e61f790a6f2de5f3d131a83 100644 (file)
@@ -27,8 +27,8 @@
 #include "access/heapam.h"
 #include "access/reloptions.h"
 #include "access/htup_details.h"
-#include "access/tableam.h"
 #include "access/sysattr.h"
+#include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/namespace.h"
@@ -60,7 +60,7 @@ typedef struct
        Relation        rel;                    /* relation to write to */
        ObjectAddress reladdr;          /* address of rel, for ExecCreateTableAs */
        CommandId       output_cid;             /* cmin to insert in output tuples */
-       int                     hi_options;             /* heap_insert performance options */
+       int                     ti_options;             /* table_insert performance options */
        BulkInsertState bistate;        /* bulk insert state */
 } DR_intorel;
 
@@ -558,8 +558,8 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
         * We can skip WAL-logging the insertions, unless PITR or streaming
         * replication is in use. We can skip the FSM in any case.
         */
-       myState->hi_options = HEAP_INSERT_SKIP_FSM |
-               (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
+       myState->ti_options = TABLE_INSERT_SKIP_FSM |
+               (XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL);
        myState->bistate = GetBulkInsertState();
 
        /* Not using WAL requires smgr_targblock be initially invalid */
@@ -586,7 +586,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
        table_insert(myState->rel,
                                 slot,
                                 myState->output_cid,
-                                myState->hi_options,
+                                myState->ti_options,
                                 myState->bistate);
 
        /* We know this is a newly created relation, so there are no indexes */
@@ -604,9 +604,7 @@ intorel_shutdown(DestReceiver *self)
 
        FreeBulkInsertState(myState->bistate);
 
-       /* If we skipped using WAL, must heap_sync before commit */
-       if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(myState->rel);
+       table_finish_bulk_insert(myState->rel, myState->ti_options);
 
        /* close rel, but keep lock until commit */
        table_close(myState->rel, NoLock);
index 5b2cbc7c89cde05bbd3a8c729d545010bedce939..2aac63296bfee535af3ea660c617b265d7ec8042 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
+#include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
@@ -53,7 +54,7 @@ typedef struct
        /* These fields are filled by transientrel_startup: */
        Relation        transientrel;   /* relation to write to */
        CommandId       output_cid;             /* cmin to insert in output tuples */
-       int                     hi_options;             /* heap_insert performance options */
+       int                     ti_options;             /* table_insert performance options */
        BulkInsertState bistate;        /* bulk insert state */
 } DR_transientrel;
 
@@ -257,7 +258,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
         * as open scans.
         *
         * NB: We count on this to protect us against problems with refreshing the
-        * data using HEAP_INSERT_FROZEN.
+        * data using TABLE_INSERT_FROZEN.
         */
        CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
 
@@ -461,9 +462,9 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
         * We can skip WAL-logging the insertions, unless PITR or streaming
         * replication is in use. We can skip the FSM in any case.
         */
-       myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
+       myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
        if (!XLogIsNeeded())
-               myState->hi_options |= HEAP_INSERT_SKIP_WAL;
+               myState->ti_options |= TABLE_INSERT_SKIP_WAL;
        myState->bistate = GetBulkInsertState();
 
        /* Not using WAL requires smgr_targblock be initially invalid */
@@ -490,7 +491,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
        table_insert(myState->transientrel,
                                 slot,
                                 myState->output_cid,
-                                myState->hi_options,
+                                myState->ti_options,
                                 myState->bistate);
 
        /* We know this is a newly created relation, so there are no indexes */
@@ -508,9 +509,7 @@ transientrel_shutdown(DestReceiver *self)
 
        FreeBulkInsertState(myState->bistate);
 
-       /* If we skipped using WAL, must heap_sync before commit */
-       if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(myState->transientrel);
+       table_finish_bulk_insert(myState->transientrel, myState->ti_options);
 
        /* close transientrel, but keep lock until commit */
        table_close(myState->transientrel, NoLock);
index 31e2b508b8cd8ddf4c1b99992ce308101ced7e79..654179297cfe927e1ca7dd9ab30751f319971c3c 100644 (file)
@@ -4687,7 +4687,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
        EState     *estate;
        CommandId       mycid;
        BulkInsertState bistate;
-       int                     hi_options;
+       int                     ti_options;
        ExprState  *partqualstate = NULL;
 
        /*
@@ -4704,7 +4704,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
                newrel = NULL;
 
        /*
-        * Prepare a BulkInsertState and options for heap_insert. Because we're
+        * Prepare a BulkInsertState and options for table_insert. Because we're
         * building a new heap, we can skip WAL-logging and fsync it to disk at
         * the end instead (unless WAL-logging is required for archiving or
         * streaming replication). The FSM is empty too, so don't bother using it.
@@ -4714,16 +4714,16 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
                mycid = GetCurrentCommandId(true);
                bistate = GetBulkInsertState();
 
-               hi_options = HEAP_INSERT_SKIP_FSM;
+               ti_options = TABLE_INSERT_SKIP_FSM;
                if (!XLogIsNeeded())
-                       hi_options |= HEAP_INSERT_SKIP_WAL;
+                       ti_options |= TABLE_INSERT_SKIP_WAL;
        }
        else
        {
                /* keep compiler quiet about using these uninitialized */
                mycid = 0;
                bistate = NULL;
-               hi_options = 0;
+               ti_options = 0;
        }
 
        /*
@@ -4977,7 +4977,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
                        /* Write the tuple out to the new relation */
                        if (newrel)
-                               table_insert(newrel, insertslot, mycid, hi_options, bistate);
+                               table_insert(newrel, insertslot, mycid, ti_options, bistate);
 
                        ResetExprContext(econtext);
 
@@ -5000,9 +5000,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
        {
                FreeBulkInsertState(bistate);
 
-               /* If we skipped writing WAL, then we need to sync the heap. */
-               if (hi_options & HEAP_INSERT_SKIP_WAL)
-                       heap_sync(newrel);
+               table_finish_bulk_insert(newrel, ti_options);
 
                table_close(newrel, NoLock);
        }
index 9bc1d24b4a6b7ec4b6a3575dd0adf5617b06681b..4efe178ed1eb85ca85f7398ee8024c9280c5e27c 100644 (file)
@@ -380,6 +380,21 @@ typedef struct TableAmRoutine
                                                           uint8 flags,
                                                           TM_FailureData *tmfd);
 
+       /*
+        * Perform operations necessary to complete insertions made via
+        * tuple_insert and multi_insert with a BulkInsertState specified. This
+        * e.g. may e.g. used to flush the relation when inserting with
+        * TABLE_INSERT_SKIP_WAL specified.
+        *
+        * Typically callers of tuple_insert and multi_insert will just pass all
+        * the flags the apply to them, and each AM has to decide which of them
+        * make sense for it, and then only take actions in finish_bulk_insert
+        * that make sense for a specific AM.
+        *
+        * Optional callback.
+        */
+       void            (*finish_bulk_insert) (Relation rel, int options);
+
 
        /* ------------------------------------------------------------------------
         * DDL related functionality.
@@ -1011,7 +1026,8 @@ table_compute_xid_horizon_for_tuples(Relation rel,
  *
  *
  * The BulkInsertState object (if any; bistate can be NULL for default
- * behavior) is also just passed through to RelationGetBufferForTuple.
+ * behavior) is also just passed through to RelationGetBufferForTuple. If
+ * `bistate` is provided, table_finish_bulk_insert() needs to be called.
  *
  * On return the slot's tts_tid and tts_tableOid are updated to reflect the
  * insertion. But note that any toasting of fields within the slot is NOT
@@ -1185,6 +1201,20 @@ table_lock_tuple(Relation rel, ItemPointer tid, Snapshot snapshot,
                                                                           flags, tmfd);
 }
 
+/*
+ * Perform operations necessary to complete insertions made via
+ * tuple_insert and multi_insert with a BulkInsertState specified. This
+ * e.g. may e.g. used to flush the relation when inserting with
+ * TABLE_INSERT_SKIP_WAL specified.
+ */
+static inline void
+table_finish_bulk_insert(Relation rel, int options)
+{
+       /* optional callback */
+       if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert)
+               rel->rd_tableam->finish_bulk_insert(rel, options);
+}
+
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.