From: Andres Freund Date: Mon, 1 Apr 2019 21:41:42 +0000 (-0700) Subject: tableam: Add table_finish_bulk_insert(). X-Git-Tag: REL_12_BETA1~341 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d45e40158623baacd3f36f92a670d435cc1e845f;p=postgresql tableam: Add table_finish_bulk_insert(). This replaces the previous calls of heap_sync() in places using bulk-insert. By passing in the flags used for bulk-insert the AM can decide (first at insert time and then during the finish call) which of the optimizations apply to it, and what operations are necessary to finish a bulk insert operation. Also change HEAP_INSERT_* flags to TABLE_INSERT, and rename hi_options to ti_options. These changes are made even in copy.c, which hasn't yet been converted to tableam. There's no harm in doing so. Author: Andres Freund Discussion: https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de --- diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index ce54b16f34..5c96fc91b7 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -540,6 +540,17 @@ tuple_lock_retry: return result; } +static void +heapam_finish_bulk_insert(Relation relation, int options) +{ + /* + * If we skipped writing WAL, then we need to sync the heap (but not + * indexes since those use WAL anyway / don't go through tableam) + */ + if (options & HEAP_INSERT_SKIP_WAL) + heap_sync(relation); +} + /* ------------------------------------------------------------------------ * DDL related callbacks for heap AM. @@ -2401,6 +2412,7 @@ static const TableAmRoutine heapam_methods = { .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, + .finish_bulk_insert = heapam_finish_bulk_insert, .tuple_fetch_row_version = heapam_fetch_row_version, .tuple_get_latest_tid = heap_get_latest_tid, diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 27d3a012af..c1fd7b78ce 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -319,7 +319,7 @@ static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls); static void CopyFromInsertBatch(CopyState cstate, EState *estate, - CommandId mycid, int hi_options, + CommandId mycid, int ti_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, @@ -2328,7 +2328,7 @@ CopyFrom(CopyState cstate) PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); - int hi_options = 0; /* start with default heap_insert options */ + int ti_options = 0; /* start with default table_insert options */ BulkInsertState bistate; CopyInsertMethod insertMethod; uint64 processed = 0; @@ -2392,8 +2392,8 @@ CopyFrom(CopyState cstate) * - data is being written to relfilenode created in this transaction * then we can skip writing WAL. It's safe because if the transaction * doesn't commit, we'll discard the table (or the new relfilenode file). - * If it does commit, we'll have done the heap_sync at the bottom of this - * routine first. + * If it does commit, we'll have done the table_finish_bulk_insert() at + * the bottom of this routine first. * * As mentioned in comments in utils/rel.h, the in-same-transaction test * is not always set correctly, since in rare cases rd_newRelfilenodeSubid @@ -2437,9 +2437,9 @@ CopyFrom(CopyState cstate) (cstate->rel->rd_createSubid != InvalidSubTransactionId || cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)) { - hi_options |= HEAP_INSERT_SKIP_FSM; + ti_options |= TABLE_INSERT_SKIP_FSM; if (!XLogIsNeeded()) - hi_options |= HEAP_INSERT_SKIP_WAL; + ti_options |= TABLE_INSERT_SKIP_WAL; } /* @@ -2491,7 +2491,7 @@ CopyFrom(CopyState cstate) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction"))); - hi_options |= HEAP_INSERT_FROZEN; + ti_options |= TABLE_INSERT_FROZEN; } /* @@ -2755,7 +2755,7 @@ CopyFrom(CopyState cstate) { MemoryContext oldcontext; - CopyFromInsertBatch(cstate, estate, mycid, hi_options, + CopyFromInsertBatch(cstate, estate, mycid, ti_options, prevResultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); @@ -2978,7 +2978,7 @@ CopyFrom(CopyState cstate) if (nBufferedTuples == MAX_BUFFERED_TUPLES || bufferedTuplesSize > 65535) { - CopyFromInsertBatch(cstate, estate, mycid, hi_options, + CopyFromInsertBatch(cstate, estate, mycid, ti_options, resultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); @@ -3015,7 +3015,7 @@ CopyFrom(CopyState cstate) { tuple = ExecFetchSlotHeapTuple(slot, true, NULL); heap_insert(resultRelInfo->ri_RelationDesc, tuple, - mycid, hi_options, bistate); + mycid, ti_options, bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } @@ -3050,13 +3050,13 @@ CopyFrom(CopyState cstate) { if (insertMethod == CIM_MULTI_CONDITIONAL) { - CopyFromInsertBatch(cstate, estate, mycid, hi_options, + CopyFromInsertBatch(cstate, estate, mycid, ti_options, prevResultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); } else - CopyFromInsertBatch(cstate, estate, mycid, hi_options, + CopyFromInsertBatch(cstate, estate, mycid, ti_options, resultRelInfo, myslot, bistate, nBufferedTuples, bufferedTuples, firstBufferedLineNo); @@ -3106,12 +3106,7 @@ CopyFrom(CopyState cstate) FreeExecutorState(estate); - /* - * If we skipped writing WAL, then we need to sync the heap (but not - * indexes since those use WAL anyway) - */ - if (hi_options & HEAP_INSERT_SKIP_WAL) - heap_sync(cstate->rel); + table_finish_bulk_insert(cstate->rel, ti_options); return processed; } @@ -3123,7 +3118,7 @@ CopyFrom(CopyState cstate) */ static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, - int hi_options, ResultRelInfo *resultRelInfo, + int ti_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, uint64 firstBufferedLineNo) @@ -3149,7 +3144,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, bufferedTuples, nBufferedTuples, mycid, - hi_options, + ti_options, bistate); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 3bdb67c697..43c2fa9124 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -27,8 +27,8 @@ #include "access/heapam.h" #include "access/reloptions.h" #include "access/htup_details.h" -#include "access/tableam.h" #include "access/sysattr.h" +#include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" @@ -60,7 +60,7 @@ typedef struct Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ CommandId output_cid; /* cmin to insert in output tuples */ - int hi_options; /* heap_insert performance options */ + int ti_options; /* table_insert performance options */ BulkInsertState bistate; /* bulk insert state */ } DR_intorel; @@ -558,8 +558,8 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * We can skip WAL-logging the insertions, unless PITR or streaming * replication is in use. We can skip the FSM in any case. */ - myState->hi_options = HEAP_INSERT_SKIP_FSM | - (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL); + myState->ti_options = TABLE_INSERT_SKIP_FSM | + (XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL); myState->bistate = GetBulkInsertState(); /* Not using WAL requires smgr_targblock be initially invalid */ @@ -586,7 +586,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) table_insert(myState->rel, slot, myState->output_cid, - myState->hi_options, + myState->ti_options, myState->bistate); /* We know this is a newly created relation, so there are no indexes */ @@ -604,9 +604,7 @@ intorel_shutdown(DestReceiver *self) FreeBulkInsertState(myState->bistate); - /* If we skipped using WAL, must heap_sync before commit */ - if (myState->hi_options & HEAP_INSERT_SKIP_WAL) - heap_sync(myState->rel); + table_finish_bulk_insert(myState->rel, myState->ti_options); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 5b2cbc7c89..2aac63296b 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -18,6 +18,7 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "access/multixact.h" +#include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" @@ -53,7 +54,7 @@ typedef struct /* These fields are filled by transientrel_startup: */ Relation transientrel; /* relation to write to */ CommandId output_cid; /* cmin to insert in output tuples */ - int hi_options; /* heap_insert performance options */ + int ti_options; /* table_insert performance options */ BulkInsertState bistate; /* bulk insert state */ } DR_transientrel; @@ -257,7 +258,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, * as open scans. * * NB: We count on this to protect us against problems with refreshing the - * data using HEAP_INSERT_FROZEN. + * data using TABLE_INSERT_FROZEN. */ CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW"); @@ -461,9 +462,9 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * We can skip WAL-logging the insertions, unless PITR or streaming * replication is in use. We can skip the FSM in any case. */ - myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN; + myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; if (!XLogIsNeeded()) - myState->hi_options |= HEAP_INSERT_SKIP_WAL; + myState->ti_options |= TABLE_INSERT_SKIP_WAL; myState->bistate = GetBulkInsertState(); /* Not using WAL requires smgr_targblock be initially invalid */ @@ -490,7 +491,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) table_insert(myState->transientrel, slot, myState->output_cid, - myState->hi_options, + myState->ti_options, myState->bistate); /* We know this is a newly created relation, so there are no indexes */ @@ -508,9 +509,7 @@ transientrel_shutdown(DestReceiver *self) FreeBulkInsertState(myState->bistate); - /* If we skipped using WAL, must heap_sync before commit */ - if (myState->hi_options & HEAP_INSERT_SKIP_WAL) - heap_sync(myState->transientrel); + table_finish_bulk_insert(myState->transientrel, myState->ti_options); /* close transientrel, but keep lock until commit */ table_close(myState->transientrel, NoLock); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 31e2b508b8..654179297c 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -4687,7 +4687,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) EState *estate; CommandId mycid; BulkInsertState bistate; - int hi_options; + int ti_options; ExprState *partqualstate = NULL; /* @@ -4704,7 +4704,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) newrel = NULL; /* - * Prepare a BulkInsertState and options for heap_insert. Because we're + * Prepare a BulkInsertState and options for table_insert. Because we're * building a new heap, we can skip WAL-logging and fsync it to disk at * the end instead (unless WAL-logging is required for archiving or * streaming replication). The FSM is empty too, so don't bother using it. @@ -4714,16 +4714,16 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) mycid = GetCurrentCommandId(true); bistate = GetBulkInsertState(); - hi_options = HEAP_INSERT_SKIP_FSM; + ti_options = TABLE_INSERT_SKIP_FSM; if (!XLogIsNeeded()) - hi_options |= HEAP_INSERT_SKIP_WAL; + ti_options |= TABLE_INSERT_SKIP_WAL; } else { /* keep compiler quiet about using these uninitialized */ mycid = 0; bistate = NULL; - hi_options = 0; + ti_options = 0; } /* @@ -4977,7 +4977,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) /* Write the tuple out to the new relation */ if (newrel) - table_insert(newrel, insertslot, mycid, hi_options, bistate); + table_insert(newrel, insertslot, mycid, ti_options, bistate); ResetExprContext(econtext); @@ -5000,9 +5000,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) { FreeBulkInsertState(bistate); - /* If we skipped writing WAL, then we need to sync the heap. */ - if (hi_options & HEAP_INSERT_SKIP_WAL) - heap_sync(newrel); + table_finish_bulk_insert(newrel, ti_options); table_close(newrel, NoLock); } diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 9bc1d24b4a..4efe178ed1 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -380,6 +380,21 @@ typedef struct TableAmRoutine uint8 flags, TM_FailureData *tmfd); + /* + * Perform operations necessary to complete insertions made via + * tuple_insert and multi_insert with a BulkInsertState specified. This + * e.g. may e.g. used to flush the relation when inserting with + * TABLE_INSERT_SKIP_WAL specified. + * + * Typically callers of tuple_insert and multi_insert will just pass all + * the flags the apply to them, and each AM has to decide which of them + * make sense for it, and then only take actions in finish_bulk_insert + * that make sense for a specific AM. + * + * Optional callback. + */ + void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ * DDL related functionality. @@ -1011,7 +1026,8 @@ table_compute_xid_horizon_for_tuples(Relation rel, * * * The BulkInsertState object (if any; bistate can be NULL for default - * behavior) is also just passed through to RelationGetBufferForTuple. + * behavior) is also just passed through to RelationGetBufferForTuple. If + * `bistate` is provided, table_finish_bulk_insert() needs to be called. * * On return the slot's tts_tid and tts_tableOid are updated to reflect the * insertion. But note that any toasting of fields within the slot is NOT @@ -1185,6 +1201,20 @@ table_lock_tuple(Relation rel, ItemPointer tid, Snapshot snapshot, flags, tmfd); } +/* + * Perform operations necessary to complete insertions made via + * tuple_insert and multi_insert with a BulkInsertState specified. This + * e.g. may e.g. used to flush the relation when inserting with + * TABLE_INSERT_SKIP_WAL specified. + */ +static inline void +table_finish_bulk_insert(Relation rel, int options) +{ + /* optional callback */ + if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert) + rel->rd_tableam->finish_bulk_insert(rel, options); +} + /* ------------------------------------------------------------------------ * DDL related functionality.