REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot
+ spill slot truncate
regresscheck: | submake-regress submake-test_decoding temp-install
$(pg_regress_check) \
--- /dev/null
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------------------------------------------------------
+ BEGIN
+ table public.tab1: TRUNCATE: (no-flags)
+ COMMIT
+ BEGIN
+ table public.tab1: TRUNCATE: restart_seqs cascade
+ COMMIT
+ BEGIN
+ table public.tab1, public.tab2: TRUNCATE: (no-flags)
+ COMMIT
+(9 rows)
+
--- /dev/null
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static void pg_decode_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations, Relation relations[],
+ ReorderBufferChange *change);
static bool pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pg_decode_message(LogicalDecodingContext *ctx,
cb->startup_cb = pg_decode_startup;
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
+ cb->truncate_cb = pg_decode_truncate;
cb->commit_cb = pg_decode_commit_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ TestDecodingData *data;
+ MemoryContext old;
+ int i;
+
+ data = ctx->output_plugin_private;
+
+ /* output BEGIN if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
+ /* Avoid leaking memory by using and resetting our own context */
+ old = MemoryContextSwitchTo(data->context);
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfoString(ctx->out, "table ");
+
+ for (i = 0; i < nrelations; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(ctx->out, ", ");
+
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
+ NameStr(relations[i]->rd_rel->relname)));
+ }
+
+ appendStringInfoString(ctx->out, ": TRUNCATE:");
+
+ if (change->data.truncate.restart_seqs
+ || change->data.truncate.cascade)
+ {
+ if (change->data.truncate.restart_seqs)
+ appendStringInfo(ctx->out, " restart_seqs");
+ if (change->data.truncate.cascade)
+ appendStringInfo(ctx->out, " cascade");
+ }
+ else
+ appendStringInfoString(ctx->out, " (no-flags)");
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
+ LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
The <function>begin_cb</function>, <function>change_cb</function>
and <function>commit_cb</function> callbacks are required,
while <function>startup_cb</function>,
- <function>filter_by_origin_cb</function>
+ <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
and <function>shutdown_cb</function> are optional.
+ If <function>truncate_cb</function> is not set but a
+ <command>TRUNCATE</command> is to be decoded, the action will be ignored.
</para>
</sect2>
</note>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-truncate">
+ <title>Truncate Callback</title>
+
+ <para>
+ The <function>truncate_cb</function> callback is called for a
+ <command>TRUNCATE</command> command.
+<programlisting>
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+</programlisting>
+ The parameters are analogous to the <function>change_cb</function>
+ callback. However, because <command>TRUNCATE</command> actions on
+ tables connected by foreign keys need to be executed together, this
+ callback receives an array of relations instead of just a single one.
+ See the description of the <xref linkend="sql-truncate"/> statement for
+ details.
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-filter-origin">
<title>Origin Filter Callback</title>
case XLOG_HEAP_UPDATE:
heap_xlog_update(record, false);
break;
+ case XLOG_HEAP_TRUNCATE:
+ /*
+ * TRUNCATE is a no-op because the actions are already logged as
+ * SMGR WAL records. TRUNCATE WAL record only exists for logical
+ * decoding.
+ */
+ break;
case XLOG_HEAP_HOT_UPDATE:
heap_xlog_update(record, true);
break;
xlrec->new_offnum,
xlrec->new_xmax);
}
+ else if (info == XLOG_HEAP_TRUNCATE)
+ {
+ xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
+ int i;
+
+ if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+ appendStringInfo(buf, "cascade ");
+ if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+ appendStringInfo(buf, "restart_seqs ");
+ appendStringInfo(buf, "nrelids %u relids", xlrec->nrelids);
+ for (i = 0; i < xlrec->nrelids; i++)
+ appendStringInfo(buf, " %u", xlrec->relids[i]);
+ }
else if (info == XLOG_HEAP_CONFIRM)
{
xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
id = "HOT_UPDATE+INIT";
break;
+ case XLOG_HEAP_TRUNCATE:
+ id = "TRUNCATE";
+ break;
case XLOG_HEAP_CONFIRM:
id = "HEAP_CONFIRM";
break;
#include "access/genam.h"
#include "access/heapam.h"
+#include "access/heapam_xlog.h"
#include "access/multixact.h"
#include "access/reloptions.h"
#include "access/relscan.h"
{
List *rels = NIL;
List *relids = NIL;
- List *seq_relids = NIL;
- EState *estate;
- ResultRelInfo *resultRelInfos;
- ResultRelInfo *resultRelInfo;
- SubTransactionId mySubid;
+ List *relids_logged = NIL;
ListCell *cell;
/*
truncate_check_rel(rel);
rels = lappend(rels, rel);
relids = lappend_oid(relids, myrelid);
+ /* Log this relation only if needed for logical decoding */
+ if (RelationIsLogicallyLogged(rel))
+ relids_logged = lappend_oid(relids_logged, myrelid);
if (recurse)
{
truncate_check_rel(rel);
rels = lappend(rels, rel);
relids = lappend_oid(relids, childrelid);
+ /* Log this relation only if needed for logical decoding */
+ if (RelationIsLogicallyLogged(rel))
+ relids_logged = lappend_oid(relids_logged, childrelid);
}
}
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
}
+ ExecuteTruncateGuts(rels, relids, relids_logged,
+ stmt->behavior, stmt->restart_seqs);
+
+ /* And close the rels */
+ foreach(cell, rels)
+ {
+ Relation rel = (Relation) lfirst(cell);
+
+ heap_close(rel, NoLock);
+ }
+}
+
+/*
+ * ExecuteTruncateGuts
+ *
+ * Internal implementation of TRUNCATE. This is called by the actual TRUNCATE
+ * command (see above) as well as replication subscribers that execute a
+ * replicated TRUNCATE action.
+ *
+ * explicit_rels is the list of Relations to truncate that the command
+ * specified. relids is the list of Oids corresponding to explicit_rels.
+ * relids_logged is the list of Oids (a subset of relids) that require
+ * WAL-logging. This is all a bit redundant, but the existing callers have
+ * this information handy in this form.
+ */
+void
+ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
+ DropBehavior behavior, bool restart_seqs)
+{
+ List *rels;
+ List *seq_relids = NIL;
+ EState *estate;
+ ResultRelInfo *resultRelInfos;
+ ResultRelInfo *resultRelInfo;
+ SubTransactionId mySubid;
+ ListCell *cell;
+ Oid *logrelids;
+
/*
+ * Open, exclusive-lock, and check all the explicitly-specified relations
+ *
* In CASCADE mode, suck in all referencing relations as well. This
* requires multiple iterations to find indirectly-dependent relations. At
* each phase, we need to exclusive-lock new rels before looking for their
* soon as we open it, to avoid a faux pas such as holding lock for a long
* time on a rel we have no permissions for.
*/
- if (stmt->behavior == DROP_CASCADE)
+ rels = list_copy(explicit_rels);
+ if (behavior == DROP_CASCADE)
{
for (;;)
{
truncate_check_rel(rel);
rels = lappend(rels, rel);
relids = lappend_oid(relids, relid);
+ /* Log this relation only if needed for logical decoding */
+ if (RelationIsLogicallyLogged(rel))
+ relids_logged = lappend_oid(relids_logged, relid);
}
}
}
#ifdef USE_ASSERT_CHECKING
heap_truncate_check_FKs(rels, false);
#else
- if (stmt->behavior == DROP_RESTRICT)
+ if (behavior == DROP_RESTRICT)
heap_truncate_check_FKs(rels, false);
#endif
* We want to do this early since it's pointless to do all the truncation
* work only to fail on sequence permissions.
*/
- if (stmt->restart_seqs)
+ if (restart_seqs)
{
foreach(cell, rels)
{
ResetSequence(seq_relid);
}
+ /*
+ * Write a WAL record to allow this set of actions to be logically decoded.
+ *
+ * Assemble an array of relids so we can write a single WAL record for the
+ * whole action.
+ */
+ if (list_length(relids_logged) > 0)
+ {
+ xl_heap_truncate xlrec;
+ int i = 0;
+
+ /* should only get here if wal_level >= logical */
+ Assert(XLogLogicalInfoActive());
+
+ logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
+ foreach (cell, relids_logged)
+ logrelids[i++] = lfirst_oid(cell);
+
+ xlrec.dbId = MyDatabaseId;
+ xlrec.nrelids = list_length(relids_logged);
+ xlrec.flags = 0;
+ if (behavior == DROP_CASCADE)
+ xlrec.flags |= XLH_TRUNCATE_CASCADE;
+ if (restart_seqs)
+ xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
+ XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid));
+
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ (void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
+ }
+
/*
* Process all AFTER STATEMENT TRUNCATE triggers.
*/
/* We can clean up the EState now */
FreeExecutorState(estate);
- /* And close the rels (can't do this while EState still holds refs) */
+ /*
+ * Close any rels opened by CASCADE (can't do this while EState still
+ * holds refs)
+ */
+ rels = list_difference_ptr(rels, explicit_rels);
foreach(cell, rels)
{
Relation rel = (Relation) lfirst(cell);
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
DecodeDelete(ctx, buf);
break;
+ case XLOG_HEAP_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeTruncate(ctx, buf);
+ break;
+
case XLOG_HEAP_INPLACE:
/*
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
+/*
+ * Parse XLOG_HEAP_TRUNCATE from wal
+ */
+static void
+DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_truncate *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_heap_truncate *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->dbId != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+ change->origin_id = XLogRecGetOrigin(r);
+ if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+ change->data.truncate.cascade = true;
+ if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+ change->data.truncate.restart_seqs = true;
+ change->data.truncate.nrelids = xlrec->nrelids;
+ change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid));
+ memcpy(change->data.truncate.relids, xlrec->relids,
+ xlrec->nrelids * sizeof(Oid));
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change);
+}
+
/*
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
*
XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
+static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change);
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
/* wrap output plugin callbacks, so we can add error context information */
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
+ ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
error_context_stack = errcallback.previous;
}
+static void
+truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (!ctx->callbacks.truncate_cb)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "truncate";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
break;
}
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
+ {
+ int i;
+ int nrelids = change->data.truncate.nrelids;
+ int nrelations = 0;
+ Relation *relations;
+
+ relations = palloc0(nrelids * sizeof(Relation));
+ for (i = 0; i < nrelids; i++)
+ {
+ Oid relid = change->data.truncate.relids[i];
+ Relation relation;
+
+ relation = RelationIdGetRelation(relid);
+
+ if (relation == NULL)
+ elog(ERROR, "could not open relation with OID %u", relid);
+
+ if (!RelationIsLogicallyLogged(relation))
+ continue;
+
+ relations[nrelations++] = relation;
+ }
+
+ rb->apply_truncate(rb, txn, nrelations, relations, change);
+
+ for (i = 0; i < nrelations; i++)
+ RelationClose(relations[i]);
+
+ break;
+ }
+
case REORDER_BUFFER_CHANGE_MESSAGE:
rb->message(rb, txn, change->lsn, true,
change->data.msg.prefix,
}
break;
}
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
}
/* the base struct contains all the data, easy peasy */
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
#define XLOG_HEAP_INSERT 0x00
#define XLOG_HEAP_DELETE 0x10
#define XLOG_HEAP_UPDATE 0x20
-/* 0x030 is free, was XLOG_HEAP_MOVE */
+#define XLOG_HEAP_TRUNCATE 0x30
#define XLOG_HEAP_HOT_UPDATE 0x40
#define XLOG_HEAP_CONFIRM 0x50
#define XLOG_HEAP_LOCK 0x60
#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
+/*
+ * xl_heap_delete flag values, 8 bits are available.
+ */
+#define XLH_TRUNCATE_CASCADE (1<<0)
+#define XLH_TRUNCATE_RESTART_SEQS (1<<1)
+
+/*
+ * For truncate we list all truncated relids in an array, followed by all
+ * sequence relids that need to be restarted, if any.
+ * All rels are always within the same database, so we just list dbid once.
+ */
+typedef struct xl_heap_truncate
+{
+ Oid dbId;
+ uint32 nrelids;
+ uint8 flags;
+ Oid relids[FLEXIBLE_ARRAY_MEMBER];
+} xl_heap_truncate;
+
+#define SizeOfHeapTruncate (offsetof(xl_heap_truncate, relids))
+
/*
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
* or updated tuple in WAL; we can save a few bytes by reconstructing the
extern void CheckTableNotInUse(Relation rel, const char *stmt);
extern void ExecuteTruncate(TruncateStmt *stmt);
+extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
+ DropBehavior behavior, bool restart_seqs);
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
Relation relation,
ReorderBufferChange *change);
+/*
+ * Callback for every TRUNCATE in a successful transaction.
+ */
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
/*
* Called for every (explicit or implicit) COMMIT of a successful transaction.
*/
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
+ LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
- REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
+ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+ REORDER_BUFFER_CHANGE_TRUNCATE
};
/*
ReorderBufferTupleBuf *newtuple;
} tp;
+ /*
+ * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
+ * one set of relations to be truncated.
+ */
+ struct
+ {
+ Size nrelids;
+ bool cascade;
+ bool restart_seqs;
+ Oid *relids;
+ } truncate;
+
/* Message with arbitrary data. */
struct
{
Relation relation,
ReorderBufferChange *change);
+/* truncate callback signature */
+typedef void (*ReorderBufferApplyTruncateCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
/* begin callback signature */
typedef void (*ReorderBufferBeginCB) (
ReorderBuffer *rb,
*/
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
+ ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;