]> granicus.if.org Git - postgresql/commitdiff
Ability to advance replication slots
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 17 Jan 2018 11:38:34 +0000 (11:38 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 17 Jan 2018 11:38:34 +0000 (11:38 +0000)
Ability to advance both physical and logical replication slots using a
new user function pg_replication_slot_advance().

For logical advance that means records are consumed as fast as possible
and changes are not given to output plugin for sending. Makes 2nd phase
(after we reached SNAPBUILD_FULL_SNAPSHOT) of replication slot creation
faster, especially when there are big transactions as the reorder buffer
does not have to deal with data changes and does not have to spill to
disk.

Author: Petr Jelinek
Reviewed-by: Simon Riggs
contrib/test_decoding/expected/slot.out
contrib/test_decoding/sql/slot.sql
doc/src/sgml/func.sgml
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/catalog/pg_proc.h
src/include/replication/logical.h

index 9f5f8a9b76622e23ea711692de04a29e597528fe..21e9d56f73b739503acd1447ee2639c9f7d947fa 100644 (file)
@@ -92,6 +92,36 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'in
  COMMIT
 (3 rows)
 
+INSERT INTO replication_example(somedata, text) VALUES (1, 4);
+INSERT INTO replication_example(somedata, text) VALUES (1, 5);
+SELECT pg_current_wal_lsn() AS wal_lsn \gset
+INSERT INTO replication_example(somedata, text) VALUES (1, 6);
+SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
+    slot_name     
+------------------
+ regression_slot2
+(1 row)
+
+SELECT :'wal_lsn' = :'end_lsn';
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                                  data                                                   
+---------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6'
+ COMMIT
+(3 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
 DROP TABLE replication_example;
 -- error
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
index fa9561f54ea842a081bcd4d8e056f0d62c5a7c19..706340c1d8d25a12a8d49d1e72d51671418c0bb0 100644 (file)
@@ -45,6 +45,21 @@ INSERT INTO replication_example(somedata, text) VALUES (1, 3);
 SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
+INSERT INTO replication_example(somedata, text) VALUES (1, 4);
+INSERT INTO replication_example(somedata, text) VALUES (1, 5);
+
+SELECT pg_current_wal_lsn() AS wal_lsn \gset
+
+INSERT INTO replication_example(somedata, text) VALUES (1, 6);
+
+SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
+
+SELECT :'wal_lsn' = :'end_lsn';
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
 DROP TABLE replication_example;
 
 -- error
index 2428434030ba4da4bc29aa8dcc519543c0361824..487c7ff75073e38ea087a6f8c881e3d864b06b91 100644 (file)
@@ -19155,6 +19155,25 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
        </entry>
       </row>
 
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_replication_slot_advance</primary>
+        </indexterm>
+        <literal><function>pg_replication_slot_advance(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>end_lsn</parameter> <type>pg_lsn</type>)
+        <type>bool</type>
+       </entry>
+       <entry>
+        Advances the current confirmed position of a replication slot named
+        <parameter>slot_name</parameter>. The slot will not be moved backwards,
+        and it will not be moved beyond the current insert location.  Returns
+        name of the slot and real position to which it was advanced to.
+       </entry>
+      </row>
+
       <row>
        <entry id="pg-replication-origin-create">
         <indexterm>
index 537eba7875c5e454462f667ea358fd1c51788cca..6eb0d5527e0b8c1bd9d67b34deeeac921e9bb154 100644 (file)
@@ -88,6 +88,9 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
  * call ReorderBufferProcessXid for each record type by default, because
  * e.g. empty xacts can be handled more efficiently if there's no previous
  * state for them.
+ *
+ * We also support the ability to fast forward thru records, skipping some
+ * record types completely - see individual record types for details.
  */
 void
 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
@@ -332,8 +335,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                xl_invalidations *invalidations =
                                (xl_invalidations *) XLogRecGetData(r);
 
-                               ReorderBufferImmediateInvalidation(
-                                                                                                  ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+                               if (!ctx->fast_forward)
+                                       ReorderBufferImmediateInvalidation(ctx->reorder,
+                                                                                                          invalidations->nmsgs,
+                                                                                                          invalidations->msgs);
                        }
                        break;
                default:
@@ -353,14 +358,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
        ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
-       /* no point in doing anything yet */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+       /*
+        * If we don't have snapshot or we are just fast-forwarding, there is no
+        * point in decoding changes.
+        */
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+               ctx->fast_forward)
                return;
 
        switch (info)
        {
                case XLOG_HEAP2_MULTI_INSERT:
-                       if (SnapBuildProcessChange(builder, xid, buf->origptr))
+                       if (!ctx->fast_forward &&
+                               SnapBuildProcessChange(builder, xid, buf->origptr))
                                DecodeMultiInsert(ctx, buf);
                        break;
                case XLOG_HEAP2_NEW_CID:
@@ -408,8 +418,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
        ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
-       /* no point in doing anything yet */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+       /*
+        * If we don't have snapshot or we are just fast-forwarding, there is no
+        * point in decoding data changes.
+        */
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+               ctx->fast_forward)
                return;
 
        switch (info)
@@ -501,8 +515,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
        ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
 
-       /* No point in doing anything yet. */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+       /*
+        * If we don't have snapshot or we are just fast-forwarding, there is no
+        * point in decoding messages.
+        */
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+               ctx->fast_forward)
                return;
 
        message = (xl_logical_message *) XLogRecGetData(r);
@@ -554,8 +572,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
         */
        if (parsed->nmsgs > 0)
        {
-               ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
-                                                                         parsed->nmsgs, parsed->msgs);
+               if (!ctx->fast_forward)
+                       ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+                                                                                 parsed->nmsgs, parsed->msgs);
                ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
        }
 
@@ -574,6 +593,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
         *        are restarting or if we haven't assembled a consistent snapshot yet.
         * 2) The transaction happened in another database.
         * 3) The output plugin is not interested in the origin.
+        * 4) We are doing fast-forwarding
         *
         * We can't just use ReorderBufferAbort() here, because we need to execute
         * the transaction's invalidations.  This currently won't be needed if
@@ -589,7 +609,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
         */
        if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
                (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
-               FilterByOrigin(ctx, origin_id))
+               ctx->fast_forward || FilterByOrigin(ctx, origin_id))
        {
                for (i = 0; i < parsed->nsubxacts; i++)
                {
index 2fc9d7d70ff7fd665481c105769ef06d882ca464..7637efc32e055f45eb3ae0c6534797163bc8b9b2 100644 (file)
@@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options,
                                           XLogRecPtr start_lsn,
                                           TransactionId xmin_horizon,
                                           bool need_full_snapshot,
+                                          bool fast_forward,
                                           XLogPageReadCB read_page,
                                           LogicalOutputPluginWriterPrepareWrite prepare_write,
                                           LogicalOutputPluginWriterWrite do_write,
@@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options,
         * (re-)load output plugins, so we detect a bad (removed) output plugin
         * now.
         */
-       LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
+       if (!fast_forward)
+               LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
 
        /*
         * Now that the slot's xmin has been set, we can announce ourselves as a
@@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options,
 
        ctx->output_plugin_options = output_plugin_options;
 
+       ctx->fast_forward = fast_forward;
+
        MemoryContextSwitchTo(old_context);
 
        return ctx;
@@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin,
        ReplicationSlotSave();
 
        ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-                                                                need_full_snapshot, read_page, prepare_write,
-                                                                do_write, update_progress);
+                                                                need_full_snapshot, true,
+                                                                read_page, prepare_write, do_write,
+                                                                update_progress);
 
        /* call output plugin initialization callback */
        old_context = MemoryContextSwitchTo(ctx->context);
@@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin,
 LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
                                          List *output_plugin_options,
+                                         bool fast_forward,
                                          XLogPageReadCB read_page,
                                          LogicalOutputPluginWriterPrepareWrite prepare_write,
                                          LogicalOutputPluginWriterWrite do_write,
@@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
        ctx = StartupDecodingContext(output_plugin_options,
                                                                 start_lsn, InvalidTransactionId, false,
-                                                                read_page, prepare_write, do_write,
-                                                                update_progress);
+                                                                fast_forward, read_page, prepare_write,
+                                                                do_write, update_progress);
 
        /* call output plugin initialization callback */
        old_context = MemoryContextSwitchTo(ctx->context);
@@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
        LogicalErrorCallbackState state;
        ErrorContextCallback errcallback;
 
+       Assert(!ctx->fast_forward);
+
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
        state.callback_name = "startup";
@@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
        LogicalErrorCallbackState state;
        ErrorContextCallback errcallback;
 
+       Assert(!ctx->fast_forward);
+
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
        state.callback_name = "shutdown";
@@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
        LogicalErrorCallbackState state;
        ErrorContextCallback errcallback;
 
+       Assert(!ctx->fast_forward);
+
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
        state.callback_name = "begin";
@@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        LogicalErrorCallbackState state;
        ErrorContextCallback errcallback;
 
+       Assert(!ctx->fast_forward);
+
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
        state.callback_name = "commit";
@@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        LogicalErrorCallbackState state;
        ErrorContextCallback errcallback;
 
+       Assert(!ctx->fast_forward);
+
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
        state.callback_name = "change";
@@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
        ErrorContextCallback errcallback;
        bool            ret;
 
+       Assert(!ctx->fast_forward);
+
        /* Push callback + info on the error context stack */
        state.ctx = ctx;
        state.callback_name = "filter_by_origin";
@@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        LogicalErrorCallbackState state;
        ErrorContextCallback errcallback;
 
+       Assert(!ctx->fast_forward);
+
        if (ctx->callbacks.message_cb == NULL)
                return;
 
index 9aab6e71b2ee55389b2dfe25e661f7cb3422f4be..54c25f1f5b2cf26245c38ffd67a5108188d9fb44 100644 (file)
@@ -251,6 +251,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                /* restart at slot's confirmed_flush */
                ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                                                        options,
+                                                                       false,
                                                                        logical_read_local_xlog_page,
                                                                        LogicalOutputPrepareWrite,
                                                                        LogicalOutputWrite, NULL);
index b02df593e9c6f95c3a2aa86623b40bfc4fc9cf09..93d2e20f760b10efcdb9b41b1d958bd4ab356c28 100644 (file)
 #include "miscadmin.h"
 
 #include "access/htup_details.h"
+#include "replication/decode.h"
 #include "replication/slot.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/pg_lsn.h"
+#include "utils/resowner.h"
 
 static void
 check_permissions(void)
@@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
        return (Datum) 0;
 }
+
+/*
+ * Helper function for advancing physical replication slot forward.
+ */
+static XLogRecPtr
+pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+{
+       XLogRecPtr      retlsn = InvalidXLogRecPtr;
+
+       SpinLockAcquire(&MyReplicationSlot->mutex);
+       if (MyReplicationSlot->data.restart_lsn < moveto)
+       {
+               MyReplicationSlot->data.restart_lsn = moveto;
+               retlsn = moveto;
+       }
+       SpinLockRelease(&MyReplicationSlot->mutex);
+
+       return retlsn;
+}
+
+/*
+ * Helper function for advancing logical replication slot forward.
+ */
+static XLogRecPtr
+pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+{
+       LogicalDecodingContext *ctx;
+       ResourceOwner   old_resowner = CurrentResourceOwner;
+       XLogRecPtr              retlsn = InvalidXLogRecPtr;
+
+       PG_TRY();
+       {
+               /* restart at slot's confirmed_flush */
+               ctx = CreateDecodingContext(InvalidXLogRecPtr,
+                                                                       NIL,
+                                                                       true,
+                                                                       logical_read_local_xlog_page,
+                                                                       NULL, NULL, NULL);
+
+               CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
+                                                                                                  "logical decoding");
+
+               /* invalidate non-timetravel entries */
+               InvalidateSystemCaches();
+
+               /* Decode until we run out of records */
+               while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
+                          (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+               {
+                       XLogRecord *record;
+                       char       *errm = NULL;
+
+                       record = XLogReadRecord(ctx->reader, startlsn, &errm);
+                       if (errm)
+                               elog(ERROR, "%s", errm);
+
+                       /*
+                        * Now that we've set up the xlog reader state, subsequent calls
+                        * pass InvalidXLogRecPtr to say "continue from last record"
+                        */
+                       startlsn = InvalidXLogRecPtr;
+
+                       /*
+                        * The {begin_txn,change,commit_txn}_wrapper callbacks above will
+                        * store the description into our tuplestore.
+                        */
+                       if (record != NULL)
+                               LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+                       /* check limits */
+                       if (moveto <= ctx->reader->EndRecPtr)
+                               break;
+
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               CurrentResourceOwner = old_resowner;
+
+               if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+               {
+                       LogicalConfirmReceivedLocation(moveto);
+
+                       /*
+                        * If only the confirmed_flush_lsn has changed the slot won't get
+                        * marked as dirty by the above. Callers on the walsender
+                        * interface are expected to keep track of their own progress and
+                        * don't need it written out. But SQL-interface users cannot
+                        * specify their own start positions and it's harder for them to
+                        * keep track of their progress, so we should make more of an
+                        * effort to save it for them.
+                        *
+                        * Dirty the slot so it's written out at the next checkpoint.
+                        * We'll still lose its position on crash, as documented, but it's
+                        * better than always losing the position even on clean restart.
+                        */
+                       ReplicationSlotMarkDirty();
+               }
+
+               retlsn = MyReplicationSlot->data.confirmed_flush;
+
+               /* free context, call shutdown callback */
+               FreeDecodingContext(ctx);
+
+               InvalidateSystemCaches();
+       }
+       PG_CATCH();
+       {
+               /* clear all timetravel entries */
+               InvalidateSystemCaches();
+
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       return retlsn;
+}
+
+/*
+ * SQL function for moving the position in a replication slot.
+ */
+Datum
+pg_replication_slot_advance(PG_FUNCTION_ARGS)
+{
+       Name            slotname = PG_GETARG_NAME(0);
+       XLogRecPtr      moveto = PG_GETARG_LSN(1);
+       XLogRecPtr      endlsn;
+       XLogRecPtr      startlsn;
+       TupleDesc       tupdesc;
+       Datum           values[2];
+       bool            nulls[2];
+       HeapTuple       tuple;
+       Datum           result;
+
+       Assert(!MyReplicationSlot);
+
+       check_permissions();
+
+       if (XLogRecPtrIsInvalid(moveto))
+               ereport(ERROR,
+                               (errmsg("invalid target wal lsn")));
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       /*
+        * We can't move slot past what's been flushed/replayed so clamp the
+        * target possition accordingly.
+        */
+       if (!RecoveryInProgress())
+               moveto = Min(moveto, GetFlushRecPtr());
+       else
+               moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
+
+       /* Acquire the slot so we "own" it */
+       ReplicationSlotAcquire(NameStr(*slotname), true);
+
+       startlsn = MyReplicationSlot->data.confirmed_flush;
+       if (moveto < startlsn)
+       {
+               ReplicationSlotRelease();
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot move slot to %X/%X, minimum is %X/%X",
+                                               (uint32) (moveto >> 32), (uint32) moveto,
+                                               (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
+                                               (uint32) (MyReplicationSlot->data.confirmed_flush))));
+       }
+
+       if (OidIsValid(MyReplicationSlot->data.database))
+               endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+       else
+               endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+
+       values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+       nulls[0] = false;
+
+       /* Update the on disk state when lsn was updated. */
+       if (XLogRecPtrIsInvalid(endlsn))
+       {
+               ReplicationSlotMarkDirty();
+               ReplicationSlotsComputeRequiredXmin(false);
+               ReplicationSlotsComputeRequiredLSN();
+               ReplicationSlotSave();
+       }
+
+       ReplicationSlotRelease();
+
+       /* Return the reached position. */
+       values[1] = LSNGetDatum(endlsn);
+       nulls[1] = false;
+
+       tuple = heap_form_tuple(tupdesc, values, nulls);
+       result = HeapTupleGetDatum(tuple);
+
+       PG_RETURN_DATUM(result);
+}
index 8bef3fbdaf6d9a99bf74cabb0d01f01d8a5fa36d..130ecd5559b6c8148845e273b2991884ec051613 100644 (file)
@@ -1075,6 +1075,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
         * to be shipped from that position.
         */
        logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
+                                                                                                false,
                                                                                                 logical_read_xlog_page,
                                                                                                 WalSndPrepareWrite,
                                                                                                 WalSndWriteData,
index 298e0ae2f0cfef529516850dd30897552a8f00f5..f01648c9616dc6a115f378efec887212f2244aed 100644 (file)
@@ -5357,6 +5357,8 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3878 (  pg_replication_slot_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 3220" "{19,3220,19,3220}" "{i,i,o,o}" "{slot_name,upto_lsn,slot_name,end_lsn}" _null_ _null_ pg_replication_slot_advance _null_ _null_ _null_ ));
+DESCR("advance logical replication slot");
 DATA(insert OID = 3577 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
 DESCR("emit a textual logical decoding message");
 DATA(insert OID = 3578 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
index d9059e1cca634e6dbbca576bc303c639f42bcfb4..619c5f4d73e59222876c8cb8d551206e784fc87a 100644 (file)
@@ -45,6 +45,13 @@ typedef struct LogicalDecodingContext
        struct ReorderBuffer *reorder;
        struct SnapBuild *snapshot_builder;
 
+       /*
+        * Marks the logical decoding context as fast forward decoding one.
+        * Such a context does not have plugin loaded so most of the the following
+        * properties are unused.
+        */
+       bool fast_forward;
+
        OutputPluginCallbacks callbacks;
        OutputPluginOptions options;
 
@@ -97,6 +104,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 extern LogicalDecodingContext *CreateDecodingContext(
                                          XLogRecPtr start_lsn,
                                          List *output_plugin_options,
+                                         bool fast_forward,
                                          XLogPageReadCB read_page,
                                          LogicalOutputPluginWriterPrepareWrite prepare_write,
                                          LogicalOutputPluginWriterWrite do_write,