COMMIT
(3 rows)
+INSERT INTO replication_example(somedata, text) VALUES (1, 4);
+INSERT INTO replication_example(somedata, text) VALUES (1, 5);
+SELECT pg_current_wal_lsn() AS wal_lsn \gset
+INSERT INTO replication_example(somedata, text) VALUES (1, 6);
+SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
+ slot_name
+------------------
+ regression_slot2
+(1 row)
+
+SELECT :'wal_lsn' = :'end_lsn';
+ ?column?
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6'
+ COMMIT
+(3 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
DROP TABLE replication_example;
-- error
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO replication_example(somedata, text) VALUES (1, 4);
+INSERT INTO replication_example(somedata, text) VALUES (1, 5);
+
+SELECT pg_current_wal_lsn() AS wal_lsn \gset
+
+INSERT INTO replication_example(somedata, text) VALUES (1, 6);
+
+SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
+
+SELECT :'wal_lsn' = :'end_lsn';
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
DROP TABLE replication_example;
-- error
</entry>
</row>
+ <row>
+ <entry>
+ <indexterm>
+ <primary>pg_replication_slot_advance</primary>
+ </indexterm>
+ <literal><function>pg_replication_slot_advance(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>)</function></literal>
+ </entry>
+ <entry>
+ (<parameter>slot_name</parameter> <type>name</type>, <parameter>end_lsn</parameter> <type>pg_lsn</type>)
+ <type>bool</type>
+ </entry>
+ <entry>
+ Advances the current confirmed position of a replication slot named
+ <parameter>slot_name</parameter>. The slot will not be moved backwards,
+ and it will not be moved beyond the current insert location. Returns
+ name of the slot and real position to which it was advanced to.
+ </entry>
+ </row>
+
<row>
<entry id="pg-replication-origin-create">
<indexterm>
* call ReorderBufferProcessXid for each record type by default, because
* e.g. empty xacts can be handled more efficiently if there's no previous
* state for them.
+ *
+ * We also support the ability to fast forward thru records, skipping some
+ * record types completely - see individual record types for details.
*/
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
xl_invalidations *invalidations =
(xl_invalidations *) XLogRecGetData(r);
- ReorderBufferImmediateInvalidation(
- ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+ if (!ctx->fast_forward)
+ ReorderBufferImmediateInvalidation(ctx->reorder,
+ invalidations->nmsgs,
+ invalidations->msgs);
}
break;
default:
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
- /* no point in doing anything yet */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
return;
switch (info)
{
case XLOG_HEAP2_MULTI_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (!ctx->fast_forward &&
+ SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
- /* no point in doing anything yet */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding data changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
return;
switch (info)
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
- /* No point in doing anything yet. */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
return;
message = (xl_logical_message *) XLogRecGetData(r);
*/
if (parsed->nmsgs > 0)
{
- ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- parsed->nmsgs, parsed->msgs);
+ if (!ctx->fast_forward)
+ ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+ parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
* are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
* 3) The output plugin is not interested in the origin.
+ * 4) We are doing fast-forwarding
*
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
- FilterByOrigin(ctx, origin_id))
+ ctx->fast_forward || FilterByOrigin(ctx, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{
XLogRecPtr start_lsn,
TransactionId xmin_horizon,
bool need_full_snapshot,
+ bool fast_forward,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
* (re-)load output plugins, so we detect a bad (removed) output plugin
* now.
*/
- LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
+ if (!fast_forward)
+ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
/*
* Now that the slot's xmin has been set, we can announce ourselves as a
ctx->output_plugin_options = output_plugin_options;
+ ctx->fast_forward = fast_forward;
+
MemoryContextSwitchTo(old_context);
return ctx;
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- need_full_snapshot, read_page, prepare_write,
- do_write, update_progress);
+ need_full_snapshot, true,
+ read_page, prepare_write, do_write,
+ update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
+ bool fast_forward,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- read_page, prepare_write, do_write,
- update_progress);
+ fast_forward, read_page, prepare_write,
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "startup";
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "shutdown";
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "begin";
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "commit";
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "change";
ErrorContextCallback errcallback;
bool ret;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "filter_by_origin";
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
if (ctx->callbacks.message_cb == NULL)
return;
/* restart at slot's confirmed_flush */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
+ false,
logical_read_local_xlog_page,
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
#include "miscadmin.h"
#include "access/htup_details.h"
+#include "replication/decode.h"
#include "replication/slot.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/pg_lsn.h"
+#include "utils/resowner.h"
static void
check_permissions(void)
return (Datum) 0;
}
+
+/*
+ * Helper function for advancing physical replication slot forward.
+ */
+static XLogRecPtr
+pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+{
+ XLogRecPtr retlsn = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ if (MyReplicationSlot->data.restart_lsn < moveto)
+ {
+ MyReplicationSlot->data.restart_lsn = moveto;
+ retlsn = moveto;
+ }
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ return retlsn;
+}
+
+/*
+ * Helper function for advancing logical replication slot forward.
+ */
+static XLogRecPtr
+pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+{
+ LogicalDecodingContext *ctx;
+ ResourceOwner old_resowner = CurrentResourceOwner;
+ XLogRecPtr retlsn = InvalidXLogRecPtr;
+
+ PG_TRY();
+ {
+ /* restart at slot's confirmed_flush */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true,
+ logical_read_local_xlog_page,
+ NULL, NULL, NULL);
+
+ CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
+ "logical decoding");
+
+ /* invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Decode until we run out of records */
+ while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
+ (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+ {
+ XLogRecord *record;
+ char *errm = NULL;
+
+ record = XLogReadRecord(ctx->reader, startlsn, &errm);
+ if (errm)
+ elog(ERROR, "%s", errm);
+
+ /*
+ * Now that we've set up the xlog reader state, subsequent calls
+ * pass InvalidXLogRecPtr to say "continue from last record"
+ */
+ startlsn = InvalidXLogRecPtr;
+
+ /*
+ * The {begin_txn,change,commit_txn}_wrapper callbacks above will
+ * store the description into our tuplestore.
+ */
+ if (record != NULL)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ /* check limits */
+ if (moveto <= ctx->reader->EndRecPtr)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ CurrentResourceOwner = old_resowner;
+
+ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+ {
+ LogicalConfirmReceivedLocation(moveto);
+
+ /*
+ * If only the confirmed_flush_lsn has changed the slot won't get
+ * marked as dirty by the above. Callers on the walsender
+ * interface are expected to keep track of their own progress and
+ * don't need it written out. But SQL-interface users cannot
+ * specify their own start positions and it's harder for them to
+ * keep track of their progress, so we should make more of an
+ * effort to save it for them.
+ *
+ * Dirty the slot so it's written out at the next checkpoint.
+ * We'll still lose its position on crash, as documented, but it's
+ * better than always losing the position even on clean restart.
+ */
+ ReplicationSlotMarkDirty();
+ }
+
+ retlsn = MyReplicationSlot->data.confirmed_flush;
+
+ /* free context, call shutdown callback */
+ FreeDecodingContext(ctx);
+
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return retlsn;
+}
+
+/*
+ * SQL function for moving the position in a replication slot.
+ */
+Datum
+pg_replication_slot_advance(PG_FUNCTION_ARGS)
+{
+ Name slotname = PG_GETARG_NAME(0);
+ XLogRecPtr moveto = PG_GETARG_LSN(1);
+ XLogRecPtr endlsn;
+ XLogRecPtr startlsn;
+ TupleDesc tupdesc;
+ Datum values[2];
+ bool nulls[2];
+ HeapTuple tuple;
+ Datum result;
+
+ Assert(!MyReplicationSlot);
+
+ check_permissions();
+
+ if (XLogRecPtrIsInvalid(moveto))
+ ereport(ERROR,
+ (errmsg("invalid target wal lsn")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /*
+ * We can't move slot past what's been flushed/replayed so clamp the
+ * target possition accordingly.
+ */
+ if (!RecoveryInProgress())
+ moveto = Min(moveto, GetFlushRecPtr());
+ else
+ moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
+
+ /* Acquire the slot so we "own" it */
+ ReplicationSlotAcquire(NameStr(*slotname), true);
+
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ if (moveto < startlsn)
+ {
+ ReplicationSlotRelease();
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move slot to %X/%X, minimum is %X/%X",
+ (uint32) (moveto >> 32), (uint32) moveto,
+ (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
+ (uint32) (MyReplicationSlot->data.confirmed_flush))));
+ }
+
+ if (OidIsValid(MyReplicationSlot->data.database))
+ endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+ else
+ endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ nulls[0] = false;
+
+ /* Update the on disk state when lsn was updated. */
+ if (XLogRecPtrIsInvalid(endlsn))
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+ ReplicationSlotSave();
+ }
+
+ ReplicationSlotRelease();
+
+ /* Return the reached position. */
+ values[1] = LSNGetDatum(endlsn);
+ nulls[1] = false;
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ PG_RETURN_DATUM(result);
+}
* to be shipped from that position.
*/
logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
+ false,
logical_read_xlog_page,
WalSndPrepareWrite,
WalSndWriteData,
DESCR("peek at changes from replication slot");
DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3878 ( pg_replication_slot_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 3220" "{19,3220,19,3220}" "{i,i,o,o}" "{slot_name,upto_lsn,slot_name,end_lsn}" _null_ _null_ pg_replication_slot_advance _null_ _null_ _null_ ));
+DESCR("advance logical replication slot");
DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
DESCR("emit a textual logical decoding message");
DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
+ /*
+ * Marks the logical decoding context as fast forward decoding one.
+ * Such a context does not have plugin loaded so most of the the following
+ * properties are unused.
+ */
+ bool fast_forward;
+
OutputPluginCallbacks callbacks;
OutputPluginOptions options;
extern LogicalDecodingContext *CreateDecodingContext(
XLogRecPtr start_lsn,
List *output_plugin_options,
+ bool fast_forward,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,