return result;
}
+static void
+heapam_finish_bulk_insert(Relation relation, int options)
+{
+ /*
+ * If we skipped writing WAL, then we need to sync the heap (but not
+ * indexes since those use WAL anyway / don't go through tableam)
+ */
+ if (options & HEAP_INSERT_SKIP_WAL)
+ heap_sync(relation);
+}
+
/* ------------------------------------------------------------------------
* DDL related callbacks for heap AM.
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
+ .finish_bulk_insert = heapam_finish_bulk_insert,
.tuple_fetch_row_version = heapam_fetch_row_version,
.tuple_get_latest_tid = heap_get_latest_tid,
static void CopyOneRowTo(CopyState cstate,
Datum *values, bool *nulls);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
- CommandId mycid, int hi_options,
+ CommandId mycid, int ti_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
BulkInsertState bistate,
int nBufferedTuples, HeapTuple *bufferedTuples,
PartitionTupleRouting *proute = NULL;
ErrorContextCallback errcallback;
CommandId mycid = GetCurrentCommandId(true);
- int hi_options = 0; /* start with default heap_insert options */
+ int ti_options = 0; /* start with default table_insert options */
BulkInsertState bistate;
CopyInsertMethod insertMethod;
uint64 processed = 0;
* - data is being written to relfilenode created in this transaction
* then we can skip writing WAL. It's safe because if the transaction
* doesn't commit, we'll discard the table (or the new relfilenode file).
- * If it does commit, we'll have done the heap_sync at the bottom of this
- * routine first.
+ * If it does commit, we'll have done the table_finish_bulk_insert() at
+ * the bottom of this routine first.
*
* As mentioned in comments in utils/rel.h, the in-same-transaction test
* is not always set correctly, since in rare cases rd_newRelfilenodeSubid
(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
{
- hi_options |= HEAP_INSERT_SKIP_FSM;
+ ti_options |= TABLE_INSERT_SKIP_FSM;
if (!XLogIsNeeded())
- hi_options |= HEAP_INSERT_SKIP_WAL;
+ ti_options |= TABLE_INSERT_SKIP_WAL;
}
/*
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
- hi_options |= HEAP_INSERT_FROZEN;
+ ti_options |= TABLE_INSERT_FROZEN;
}
/*
{
MemoryContext oldcontext;
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ CopyFromInsertBatch(cstate, estate, mycid, ti_options,
prevResultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
bufferedTuplesSize > 65535)
{
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ CopyFromInsertBatch(cstate, estate, mycid, ti_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
{
tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
- mycid, hi_options, bistate);
+ mycid, ti_options, bistate);
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
{
if (insertMethod == CIM_MULTI_CONDITIONAL)
{
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ CopyFromInsertBatch(cstate, estate, mycid, ti_options,
prevResultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
}
else
- CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+ CopyFromInsertBatch(cstate, estate, mycid, ti_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
FreeExecutorState(estate);
- /*
- * If we skipped writing WAL, then we need to sync the heap (but not
- * indexes since those use WAL anyway)
- */
- if (hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(cstate->rel);
+ table_finish_bulk_insert(cstate->rel, ti_options);
return processed;
}
*/
static void
CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
- int hi_options, ResultRelInfo *resultRelInfo,
+ int ti_options, ResultRelInfo *resultRelInfo,
TupleTableSlot *myslot, BulkInsertState bistate,
int nBufferedTuples, HeapTuple *bufferedTuples,
uint64 firstBufferedLineNo)
bufferedTuples,
nBufferedTuples,
mycid,
- hi_options,
+ ti_options,
bistate);
MemoryContextSwitchTo(oldcontext);
#include "access/heapam.h"
#include "access/reloptions.h"
#include "access/htup_details.h"
-#include "access/tableam.h"
#include "access/sysattr.h"
+#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
Relation rel; /* relation to write to */
ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */
CommandId output_cid; /* cmin to insert in output tuples */
- int hi_options; /* heap_insert performance options */
+ int ti_options; /* table_insert performance options */
BulkInsertState bistate; /* bulk insert state */
} DR_intorel;
* We can skip WAL-logging the insertions, unless PITR or streaming
* replication is in use. We can skip the FSM in any case.
*/
- myState->hi_options = HEAP_INSERT_SKIP_FSM |
- (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ (XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL);
myState->bistate = GetBulkInsertState();
/* Not using WAL requires smgr_targblock be initially invalid */
table_insert(myState->rel,
slot,
myState->output_cid,
- myState->hi_options,
+ myState->ti_options,
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
FreeBulkInsertState(myState->bistate);
- /* If we skipped using WAL, must heap_sync before commit */
- if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(myState->rel);
+ table_finish_bulk_insert(myState->rel, myState->ti_options);
/* close rel, but keep lock until commit */
table_close(myState->rel, NoLock);
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/multixact.h"
+#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
/* These fields are filled by transientrel_startup: */
Relation transientrel; /* relation to write to */
CommandId output_cid; /* cmin to insert in output tuples */
- int hi_options; /* heap_insert performance options */
+ int ti_options; /* table_insert performance options */
BulkInsertState bistate; /* bulk insert state */
} DR_transientrel;
* as open scans.
*
* NB: We count on this to protect us against problems with refreshing the
- * data using HEAP_INSERT_FROZEN.
+ * data using TABLE_INSERT_FROZEN.
*/
CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
* We can skip WAL-logging the insertions, unless PITR or streaming
* replication is in use. We can skip the FSM in any case.
*/
- myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
+ myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
if (!XLogIsNeeded())
- myState->hi_options |= HEAP_INSERT_SKIP_WAL;
+ myState->ti_options |= TABLE_INSERT_SKIP_WAL;
myState->bistate = GetBulkInsertState();
/* Not using WAL requires smgr_targblock be initially invalid */
table_insert(myState->transientrel,
slot,
myState->output_cid,
- myState->hi_options,
+ myState->ti_options,
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
FreeBulkInsertState(myState->bistate);
- /* If we skipped using WAL, must heap_sync before commit */
- if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(myState->transientrel);
+ table_finish_bulk_insert(myState->transientrel, myState->ti_options);
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
EState *estate;
CommandId mycid;
BulkInsertState bistate;
- int hi_options;
+ int ti_options;
ExprState *partqualstate = NULL;
/*
newrel = NULL;
/*
- * Prepare a BulkInsertState and options for heap_insert. Because we're
+ * Prepare a BulkInsertState and options for table_insert. Because we're
* building a new heap, we can skip WAL-logging and fsync it to disk at
* the end instead (unless WAL-logging is required for archiving or
* streaming replication). The FSM is empty too, so don't bother using it.
mycid = GetCurrentCommandId(true);
bistate = GetBulkInsertState();
- hi_options = HEAP_INSERT_SKIP_FSM;
+ ti_options = TABLE_INSERT_SKIP_FSM;
if (!XLogIsNeeded())
- hi_options |= HEAP_INSERT_SKIP_WAL;
+ ti_options |= TABLE_INSERT_SKIP_WAL;
}
else
{
/* keep compiler quiet about using these uninitialized */
mycid = 0;
bistate = NULL;
- hi_options = 0;
+ ti_options = 0;
}
/*
/* Write the tuple out to the new relation */
if (newrel)
- table_insert(newrel, insertslot, mycid, hi_options, bistate);
+ table_insert(newrel, insertslot, mycid, ti_options, bistate);
ResetExprContext(econtext);
{
FreeBulkInsertState(bistate);
- /* If we skipped writing WAL, then we need to sync the heap. */
- if (hi_options & HEAP_INSERT_SKIP_WAL)
- heap_sync(newrel);
+ table_finish_bulk_insert(newrel, ti_options);
table_close(newrel, NoLock);
}
uint8 flags,
TM_FailureData *tmfd);
+ /*
+ * Perform operations necessary to complete insertions made via
+ * tuple_insert and multi_insert with a BulkInsertState specified. This
+ * e.g. may e.g. used to flush the relation when inserting with
+ * TABLE_INSERT_SKIP_WAL specified.
+ *
+ * Typically callers of tuple_insert and multi_insert will just pass all
+ * the flags the apply to them, and each AM has to decide which of them
+ * make sense for it, and then only take actions in finish_bulk_insert
+ * that make sense for a specific AM.
+ *
+ * Optional callback.
+ */
+ void (*finish_bulk_insert) (Relation rel, int options);
+
/* ------------------------------------------------------------------------
* DDL related functionality.
*
*
* The BulkInsertState object (if any; bistate can be NULL for default
- * behavior) is also just passed through to RelationGetBufferForTuple.
+ * behavior) is also just passed through to RelationGetBufferForTuple. If
+ * `bistate` is provided, table_finish_bulk_insert() needs to be called.
*
* On return the slot's tts_tid and tts_tableOid are updated to reflect the
* insertion. But note that any toasting of fields within the slot is NOT
flags, tmfd);
}
+/*
+ * Perform operations necessary to complete insertions made via
+ * tuple_insert and multi_insert with a BulkInsertState specified. This
+ * e.g. may e.g. used to flush the relation when inserting with
+ * TABLE_INSERT_SKIP_WAL specified.
+ */
+static inline void
+table_finish_bulk_insert(Relation rel, int options)
+{
+ /* optional callback */
+ if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert)
+ rel->rd_tableam->finish_bulk_insert(rel, options);
+}
+
/* ------------------------------------------------------------------------
* DDL related functionality.