]> granicus.if.org Git - postgresql/commitdiff
Generic Messages for Logical Decoding
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 6 Apr 2016 09:05:41 +0000 (10:05 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 6 Apr 2016 09:05:41 +0000 (10:05 +0100)
API and mechanism to allow generic messages to be inserted into WAL that are
intended to be read by logical decoding plugins. This commit adds an optional
new callback to the logical decoding API.

Messages are either text or bytea. Messages can be transactional, or not, and
are identified by a prefix to allow multiple concurrent decoding plugins.

(Not to be confused with Generic WAL records, which are intended to allow crash
recovery of extensible objects.)

Author: Petr Jelinek and Andres Freund
Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs
Discussion: 5685F999.6010202@2ndquadrant.com

27 files changed:
contrib/test_decoding/Makefile
contrib/test_decoding/expected/ddl.out
contrib/test_decoding/expected/messages.out [new file with mode: 0644]
contrib/test_decoding/sql/ddl.sql
contrib/test_decoding/sql/messages.sql [new file with mode: 0644]
contrib/test_decoding/test_decoding.c
doc/src/sgml/func.sgml
doc/src/sgml/logicaldecoding.sgml
src/backend/access/rmgrdesc/Makefile
src/backend/access/rmgrdesc/logicalmsgdesc.c [new file with mode: 0644]
src/backend/access/transam/rmgr.c
src/backend/replication/logical/Makefile
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/logical/message.c [new file with mode: 0644]
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/bin/pg_xlogdump/.gitignore
src/bin/pg_xlogdump/rmgrdesc.c
src/include/access/rmgrlist.h
src/include/catalog/pg_proc.h
src/include/replication/logicalfuncs.h
src/include/replication/message.h [new file with mode: 0644]
src/include/replication/output_plugin.h
src/include/replication/reorderbuffer.h
src/include/replication/snapbuild.h

index 06c95461f8a99352e37acb58c2f469a816c9869e..309cb0b39a3198acbd40c0bb82f76a08c6d8fed2 100644 (file)
@@ -38,7 +38,7 @@ submake-test_decoding:
        $(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-       decoding_into_rel binary prepared replorigin time
+       decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
        $(MKDIR_P) regression_output
index 77719e8fed05035d566691e6dd0d2d4afbbb7af8..32cd24d6f0d3e97c9c0683f7c912bfeb16e3c674 100644 (file)
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+    ?column?    
+----------------
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
 GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |                       min                       |                                  max                                   
--------+-------------------------------------------------+------------------------------------------------------------------------
-     1 | BEGIN                                           | BEGIN
-     1 | COMMIT                                          | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
-(3 rows)
+ count |                                  min                                  |                                  max                                   
+-------+-----------------------------------------------------------------------+------------------------------------------------------------------------
+     1 | BEGIN                                                                 | BEGIN
+     1 | COMMIT                                                                | COMMIT
+     1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg
+ 20467 | table public.tr_etoomuch: DELETE: id[integer]:1                       | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
+(4 rows)
 
 -- check updates of primary keys work correctly
 BEGIN;
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
new file mode 100644 (file)
index 0000000..e26f793
--- /dev/null
@@ -0,0 +1,79 @@
+-- predictability
+SET synchronous_commit = on;
+SET client_encoding = 'utf8';
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+ ?column? 
+----------
+ msg1
+(1 row)
+
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+ ?column? 
+----------
+ msg2
+(1 row)
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+ ?column? 
+----------
+ msg3
+(1 row)
+
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ ?column? 
+----------
+ msg4
+(1 row)
+
+ROLLBACK;
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+ ?column? 
+----------
+ msg5
+(1 row)
+
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+ ?column? 
+----------
+ msg6
+(1 row)
+
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+ ?column? 
+----------
+ msg7
+(1 row)
+
+COMMIT;
+SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň');
+   ?column?    
+---------------
+ žluťoučký kůň
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+                                 data                                 
+----------------------------------------------------------------------
+ message: transactional: 1 prefix: test, sz: 4 content:msg1
+ message: transactional: 0 prefix: test, sz: 4 content:msg2
+ message: transactional: 0 prefix: test, sz: 4 content:msg4
+ message: transactional: 0 prefix: test, sz: 4 content:msg6
+ message: transactional: 1 prefix: test, sz: 4 content:msg5
+ message: transactional: 1 prefix: test, sz: 4 content:msg7
+ message: transactional: 1 prefix: test, sz: 19 content:žluťoučký kůň
+(7 rows)
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ init
+(1 row)
+
index ad928ad572688e06a17ed352f4e6534078b69950..b1f7bf693a8fcc88b435e95b81b08b6ad37a2ab5 100644 (file)
@@ -108,11 +108,12 @@ DELETE FROM tr_pkey;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
new file mode 100644 (file)
index 0000000..d72191b
--- /dev/null
@@ -0,0 +1,25 @@
+-- predictability
+SET synchronous_commit = on;
+SET client_encoding = 'utf8';
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ROLLBACK;
+
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+COMMIT;
+
+SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň');
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
index 4cf808f28148264322339ef364d0fd2cc003da52..3336e1e16e7edeefa60754dc676bf7681d2075cb 100644 (file)
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 
 #include "utils/builtins.h"
@@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
                                 ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
                                 RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+                                                         ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+                                                         bool transactional, const char *prefix,
+                                                         Size sz, const char *message);
 
 void
 _PG_init(void)
@@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->commit_cb = pg_decode_commit_txn;
        cb->filter_by_origin_cb = pg_decode_filter;
        cb->shutdown_cb = pg_decode_shutdown;
+       cb->message_cb = pg_decode_message;
 }
 
 
@@ -471,3 +477,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
        OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+                                 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+                                 const char *prefix, Size sz, const char *message)
+{
+       OutputPluginPrepareWrite(ctx, true);
+       appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
+                                        transactional, prefix, sz);
+       appendBinaryStringInfo(ctx->out, message, sz);
+       OutputPluginWrite(ctx, true);
+}
index c6017e61aee71eb745631312618dc9c71b123b52..f60d5784fddd0f476d533d81b21311c8c3e997c1 100644 (file)
@@ -18255,6 +18255,51 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
        </entry>
       </row>
 
+      <row>
+       <entry id="pg-logical-emit-message-text">
+        <indexterm>
+         <primary>pg_logical_emit_message</primary>
+        </indexterm>
+        <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Emit text logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written immediately
+        and decoded as soon as the logical decoding reads the record. The
+        <parameter>prefix</parameter> is textual prefix used by the logical
+        decoding plugins to easily recognize interesting messages for them.
+        The <parameter>content</parameter> is the text of the message.
+       </entry>
+      </row>
+
+      <row>
+       <entry id="pg-logical-emit-message-bytea">
+        <indexterm>
+         <primary>>pg_logical_emit_message</primary>
+        </indexterm>
+        <literal><function>>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Emit binary logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written immediately
+        and decoded as soon as the logical decoding reads the record. The
+        <parameter>prefix</parameter> is textual prefix used by the logical
+        decoding plugins to easily recognize interesting messages for them.
+        The <parameter>content</parameter> is the binary content of the
+        message.
+       </entry>
+      </row>
+
      </tbody>
     </tgroup>
    </table>
index 45fdfeb13fd545f17e76422fad0c44f40946a0ba..8306d9ffaa0839b94a6b61b7c04e6781ca0de602 100644 (file)
@@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
@@ -602,6 +603,43 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (
        more efficient.
      </para>
      </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-message">
+     <title>Generic Message Callback</title>
+
+     <para>
+      The optional <function>message_cb</function> callback is called whenever
+      a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn,
+    XLogRecPtr message_lsn,
+    bool transactional,
+    const char *prefix,
+    Size message_size,
+    const char *message
+);
+</programlisting>
+      The <parameter>txn</parameter> parameter contains meta information about
+      the transaction, like the time stamp at which it has been committed and
+      its XID. Note however that it can be NULL when the message is
+      non-transactional and the XID was not assigned yet in the transaction
+      which logged the message. The <parameter>lsn</parameter> has WAL
+      position of the message. The <parameter>transactional</parameter> says
+      if the message was sent as transactional or not.
+      The <parameter>prefix</parameter> is arbitrary null-terminated prefix
+      which can be used for identifying interesting messages for the current
+      plugin. And finally the <parameter>message</parameter> parameter holds
+      the actual message of <parameter>message_size</parameter> size.
+     </para>
+     <para>
+      Extra care should be taken to ensure that the prefix the output plugin
+      considers interesting is unique. Using name of the extension or the
+      output plugin itself is often a good choice.
+     </para>
+    </sect3>
+
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
index c0e38fdf17d38211b0ddb1d9dabe0ee7918b3962..5514db1dda6ceaf95d3ef0ef37e66bddc82af420 100644 (file)
@@ -9,8 +9,8 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
-          gindesc.o gistdesc.o hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o \
-          relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
-          standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+          gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
+          mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
+          smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c
new file mode 100644 (file)
index 0000000..b194e14
--- /dev/null
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalmsgdesc.c
+ *       rmgr descriptor routines for replication/logical/message.c
+ *
+ * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/message.h"
+
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+       char       *rec = XLogRecGetData(record);
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       if (info == XLOG_LOGICAL_MESSAGE)
+       {
+               xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+               appendStringInfo(buf, "%s message size %zu bytes",
+                                  xlrec->transactional ? "transactional" : "nontransactional",
+                                                xlrec->message_size);
+       }
+}
+
+const char *
+logicalmsg_identify(uint8 info)
+{
+       if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
+               return "MESSAGE";
+
+       return NULL;
+}
index 7b38c16f521670f6cec3771429f6b4d55214ddc5..31c5fd165c0df07f53012eac39374a2f3ab5e4d3 100644 (file)
@@ -24,6 +24,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
index 8adea13bf4e065a86897e82e1f11f4b0a9c0b889..1d7ca062d11c63b1739a63b157d4f96e37aabde9 100644 (file)
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
        snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
index 7781ebcae0b7879937715ab024d3a66a8c1253e4..3e80c4a0d86efe8a86a843cafe90d7d0cd414248 100644 (file)
@@ -39,6 +39,7 @@
 
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/reorderbuffer.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
                        DecodeHeapOp(ctx, &buf);
                        break;
 
+               case RM_LOGICALMSG_ID:
+                       DecodeLogicalMsgOp(ctx, &buf);
+                       break;
+
                        /*
                         * Rmgrs irrelevant for logical decoding; they describe stuff not
                         * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -458,6 +464,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        }
 }
 
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+       SnapBuild          *builder = ctx->snapshot_builder;
+       XLogReaderState *r = buf->record;
+       TransactionId   xid = XLogRecGetXid(r);
+       uint8                   info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+       Snapshot                snapshot;
+       xl_logical_message *message;
+
+       if (info != XLOG_LOGICAL_MESSAGE)
+               elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+       ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+       /* No point in doing anything yet. */
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+               return;
+
+       message = (xl_logical_message *) XLogRecGetData(r);
+
+       if (message->transactional &&
+               !SnapBuildProcessChange(builder, xid, buf->origptr))
+               return;
+       else if (!message->transactional &&
+                        (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+                         SnapBuildXactNeedsSkip(builder, buf->origptr)))
+               return;
+
+       snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+       ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
+                                                         message->transactional,
+                                                         message->message, /* first part of message is prefix */
+                                                         message->message_size,
+                                                         message->message + message->prefix_size);
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
index 2e6d3f9203c83032f9b2f237fbf71c6c23beef1c..c06b2fa28595044e4dcb545a028146d82dc48152 100644 (file)
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                  Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                 XLogRecPtr message_lsn, bool transactional,
+                                 const char *prefix, Size message_size, const char *message);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
        ctx->reorder->begin = begin_cb_wrapper;
        ctx->reorder->apply_change = change_cb_wrapper;
        ctx->reorder->commit = commit_cb_wrapper;
+       ctx->reorder->message = message_cb_wrapper;
 
        ctx->out = makeStringInfo();
        ctx->prepare_write = prepare_write;
@@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
        return ret;
 }
 
+static void
+message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                  XLogRecPtr message_lsn, bool transactional,
+                                  const char *prefix, Size message_size, const char *message)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       if (ctx->callbacks.message_cb == NULL)
+               return;
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "message";
+       state.report_location = message_lsn;
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+       ctx->write_location = message_lsn;
+
+       /* do the actual work: call callback */
+       ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
+                                                         message_size, message);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
index dd6cd62ccd1a351a33053ee54d43a4b582b5fbd7..69d20003a664406ffbec6933b935b38973bade66 100644 (file)
@@ -24,6 +24,8 @@
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
@@ -41,6 +43,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/message.h"
 
 #include "storage/fd.h"
 
@@ -380,3 +383,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 {
        return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 }
+
+
+/*
+ * SQL function for writing logical decding message into WAL.
+ */
+Datum
+pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
+{
+       bool            transactional = PG_GETARG_BOOL(0);
+       char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
+       bytea      *data = PG_GETARG_BYTEA_PP(2);
+       XLogRecPtr      lsn;
+
+       lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
+                                                       transactional);
+       PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_emit_message_text(PG_FUNCTION_ARGS)
+{
+       /* bytea and text are compatible */
+       return pg_logical_emit_message_bytea(fcinfo);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
new file mode 100644 (file)
index 0000000..684f799
--- /dev/null
@@ -0,0 +1,87 @@
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ *       Generic logical messages.
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will be delivered always.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/message.h"
+#include "replication/logical.h"
+
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+                                 bool transactional)
+{
+       xl_logical_message      xlrec;
+
+       /*
+        * Force xid to be allocated if we're emitting a transactional message.
+        */
+       if (transactional)
+       {
+               Assert(IsTransactionState());
+               GetCurrentTransactionId();
+       }
+
+       xlrec.transactional = transactional;
+       xlrec.prefix_size = strlen(prefix) + 1;
+       xlrec.message_size = size;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+       XLogRegisterData((char *) prefix, xlrec.prefix_size);
+       XLogRegisterData((char *) message, size);
+
+       return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       if (info != XLOG_LOGICAL_MESSAGE)
+                       elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+       /* This is only interesting for logical decoding, see decode.c. */
+}
index 9d78c8c134ea457ce160d55a3080dec0600b969c..52c6986dc0c47871538d311c28b09e7261d00a98 100644 (file)
@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
                                change->data.tp.oldtuple = NULL;
                        }
                        break;
+               case REORDER_BUFFER_CHANGE_MESSAGE:
+                       if (change->data.msg.prefix != NULL)
+                               pfree(change->data.msg.prefix);
+                       change->data.msg.prefix = NULL;
+                       if (change->data.msg.message != NULL)
+                               pfree(change->data.msg.message);
+                       change->data.msg.message = NULL;
+                       break;
                case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
                        if (change->data.snapshot)
                        {
@@ -627,6 +635,61 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
        ReorderBufferCheckSerializeTXN(rb, txn);
 }
 
+/*
+ * Queue message into a transaction so it can be processed upon commit.
+ */
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
+                                                 Snapshot snapshot, XLogRecPtr lsn,
+                                                 bool transactional, const char *prefix,
+                                                 Size message_size, const char *message)
+{
+       if (transactional)
+       {
+               MemoryContext oldcontext;
+               ReorderBufferChange *change;
+
+               Assert(xid != InvalidTransactionId);
+
+               oldcontext = MemoryContextSwitchTo(rb->context);
+
+               change = ReorderBufferGetChange(rb);
+               change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+               change->data.msg.prefix = pstrdup(prefix);
+               change->data.msg.message_size = message_size;
+               change->data.msg.message = palloc(message_size);
+               memcpy(change->data.msg.message, message, message_size);
+
+               ReorderBufferQueueChange(rb, xid, lsn, change);
+
+               MemoryContextSwitchTo(oldcontext);
+       }
+       else
+       {
+               ReorderBufferTXN   *txn = NULL;
+               volatile Snapshot       snapshot_now = snapshot;
+
+               if (xid != InvalidTransactionId)
+                       txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+               /* setup snapshot to allow catalog access */
+               SetupHistoricSnapshot(snapshot_now, NULL);
+               PG_TRY();
+               {
+                       rb->message(rb, txn, lsn, false, prefix, message_size, message);
+
+                       TeardownHistoricSnapshot(false);
+               }
+               PG_CATCH();
+               {
+                       TeardownHistoricSnapshot(true);
+                       PG_RE_THROW();
+               }
+               PG_END_TRY();
+       }
+}
+
+
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
@@ -1493,6 +1556,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                        specinsert = change;
                                        break;
 
+                               case REORDER_BUFFER_CHANGE_MESSAGE:
+                                       rb->message(rb, txn, change->lsn, true,
+                                                               change->data.msg.prefix,
+                                                               change->data.msg.message_size,
+                                                               change->data.msg.message);
+                                       break;
+
                                case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
                                        /* get rid of the old */
                                        TeardownHistoricSnapshot(false);
@@ -2157,6 +2227,33 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                        memcpy(data, newtup->tuple.t_data, newlen);
                                        data += newlen;
                                }
+                               break;
+                       }
+               case REORDER_BUFFER_CHANGE_MESSAGE:
+                       {
+                               char       *data;
+                               Size            prefix_size = strlen(change->data.msg.prefix) + 1;
+
+                               sz += prefix_size + change->data.msg.message_size +
+                                       sizeof(Size) + sizeof(Size);
+                               ReorderBufferSerializeReserve(rb, sz);
+
+                               data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+                               /* write the prefix including the size */
+                               memcpy(data, &prefix_size, sizeof(Size));
+                               data += sizeof(Size);
+                               memcpy(data, change->data.msg.prefix,
+                                          prefix_size);
+                               data += prefix_size;
+
+                               /* write the message including the size */
+                               memcpy(data, &change->data.msg.message_size, sizeof(Size));
+                               data += sizeof(Size);
+                               memcpy(data, change->data.msg.message,
+                                          change->data.msg.message_size);
+                               data += change->data.msg.message_size;
+
                                break;
                        }
                case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -2415,6 +2512,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                        }
 
                        break;
+               case REORDER_BUFFER_CHANGE_MESSAGE:
+                       {
+                               Size            prefix_size;
+
+                               /* read prefix */
+                               memcpy(&prefix_size, data, sizeof(Size));
+                               data += sizeof(Size);
+                               change->data.msg.prefix = MemoryContextAlloc(rb->context,
+                                                                                                                        prefix_size);
+                               memcpy(change->data.msg.prefix, data, prefix_size);
+                               Assert(change->data.msg.prefix[prefix_size-1] == '\0');
+                               data += prefix_size;
+
+                               /* read the messsage */
+                               memcpy(&change->data.msg.message_size, data, sizeof(Size));
+                               data += sizeof(Size);
+                               change->data.msg.message = MemoryContextAlloc(rb->context,
+                                                                                               change->data.msg.message_size);
+                               memcpy(change->data.msg.message, data,
+                                          change->data.msg.message_size);
+                               data += change->data.msg.message_size;
+
+                               break;
+                       }
                case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
                        {
                                Snapshot        oldsnap;
index 179b85a4161a9e13bdb22a9b15138f06f88949af..b4dc617a8c974e5d4ab719806dadc861e6e23818 100644 (file)
@@ -604,6 +604,25 @@ SnapBuildExportSnapshot(SnapBuild *builder)
        return snapname;
 }
 
+/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+       Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+       /* only build a new snapshot if we don't have a prebuilt one */
+       if (builder->snapshot == NULL)
+       {
+               builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+               /* inrease refcount for the snapshot builder */
+               SnapBuildSnapIncRefcount(builder->snapshot);
+       }
+
+       return builder->snapshot;
+}
+
 /*
  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
  * any. Aborts the previously started transaction and resets the resource
index 33a1acfd2cb04f3a0c461ad52f049dabe443d264..c4783f12bb5c29709c5629c06d824b26864b99b0 100644 (file)
@@ -1,23 +1,4 @@
 /pg_xlogdump
 # Source files copied from src/backend/access/rmgrdesc/
-/brindesc.c
-/clogdesc.c
-/committsdesc.c
-/dbasedesc.c
-/genericdesc.c
-/gindesc.c
-/gistdesc.c
-/hashdesc.c
-/heapdesc.c
-/mxactdesc.c
-/nbtdesc.c
-/relmapdesc.c
-/replorigindesc.c
-/seqdesc.c
-/smgrdesc.c
-/spgdesc.c
-/standbydesc.c
-/tblspcdesc.c
-/xactdesc.c
-/xlogdesc.c
+/*desc.c
 /xlogreader.c
index cff7e59f34a2fd01cd522369a994386cfc90d97f..017b9c5b345d84605d71bee748d19b6990b0a684 100644 (file)
@@ -26,6 +26,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
index 3cfe6f7b5469dc19753016f4fd0232f09d651456..a7a0ae224fde4a8496cf8678ac6e7e2e8f5b42e2 100644 (file)
@@ -46,3 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
index a0edf939b3a46332adf7a790473e83c37fe9a90f..6eab6c7c33419467f01c80e3b5be2be6fc5d02db 100644 (file)
@@ -5155,6 +5155,10 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3577 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
+DESCR("emit a textual logical decoding message");
+DATA(insert OID = 3578 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
+DESCR("emit a binary logical decoding message");
 
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects             PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
index c87a1dfebdec02e063f91e3c2319dd71accee6af..554041405c426a2192c90cd9a351ea97a5791018 100644 (file)
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
 
+extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS);
 #endif
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644 (file)
index 0000000..8b968d5
--- /dev/null
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ *        Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+       bool            transactional;                                  /* is message transactional? */
+       Size            prefix_size;                                    /* length of prefix */
+       Size            message_size;                                   /* size of the message */
+       char            message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null
+                                                                                                * terminated prefix of length
+                                                                                                * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage   (offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+                                                                       size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE   0x00
+void           logicalmsg_redo(XLogReaderState *record);
+void           logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif   /* PG_LOGICAL_MESSAGE_H */
index 577b12ea2ef6d1aba539c720248c77e2234edf76..3a2ca985fbe138018d220be18a70d368f6781d46 100644 (file)
@@ -73,6 +73,18 @@ typedef void (*LogicalDecodeCommitCB) (
                                                                                                   ReorderBufferTXN *txn,
                                                                                                   XLogRecPtr commit_lsn);
 
+/*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+                                                                                        struct LogicalDecodingContext *,
+                                                                                        ReorderBufferTXN *txn,
+                                                                                        XLogRecPtr message_lsn,
+                                                                                        bool transactional,
+                                                                                        const char *prefix,
+                                                                                        Size message_size,
+                                                                                        const char *message);
+
 /*
  * Filter changes by origin.
  */
@@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks
        LogicalDecodeBeginCB begin_cb;
        LogicalDecodeChangeCB change_cb;
        LogicalDecodeCommitCB commit_cb;
+       LogicalDecodeMessageCB message_cb;
        LogicalDecodeFilterByOriginCB filter_by_origin_cb;
        LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
index b52d06af92806ef376959fb4dd8687dde2650f6e..4c54953a5127c52ed1f48e13943d37b063fc6a9a 100644 (file)
@@ -54,6 +54,7 @@ enum ReorderBufferChangeType
        REORDER_BUFFER_CHANGE_INSERT,
        REORDER_BUFFER_CHANGE_UPDATE,
        REORDER_BUFFER_CHANGE_DELETE,
+       REORDER_BUFFER_CHANGE_MESSAGE,
        REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
        REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
        REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -98,6 +99,14 @@ typedef struct ReorderBufferChange
                        ReorderBufferTupleBuf *newtuple;
                }                       tp;
 
+               /* Message with arbitrary data. */
+               struct
+               {
+                       char   *prefix;
+                       Size    message_size;
+                       char   *message;
+               } msg;
+
                /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
                Snapshot        snapshot;
 
@@ -274,6 +283,15 @@ typedef void (*ReorderBufferCommitCB) (
                                                                                                   ReorderBufferTXN *txn,
                                                                                                   XLogRecPtr commit_lsn);
 
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+                                                                                                  ReorderBuffer *rb,
+                                                                                                  ReorderBufferTXN *txn,
+                                                                                                  XLogRecPtr message_lsn,
+                                                                                                  bool transactional,
+                                                                                                  const char *prefix, Size sz,
+                                                                                                  const char *message);
+
 struct ReorderBuffer
 {
        /*
@@ -300,6 +318,7 @@ struct ReorderBuffer
        ReorderBufferBeginCB begin;
        ReorderBufferApplyChangeCB apply_change;
        ReorderBufferCommitCB commit;
+       ReorderBufferMessageCB message;
 
        /*
         * Pointer that will be passed untouched to the callbacks.
@@ -350,6 +369,9 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void           ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
 void           ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void           ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+                                                                         bool transactional, const char *prefix,
+                                                                         Size message_size, const char *message);
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
                                        XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
          TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
index 75955afd391b7286e2d67424dc1e2556c31538d9..c4127a1cf75dbd70f070cbb8e1df59fe5a7a0638 100644 (file)
@@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
 extern void SnapBuildClearExportedSnapshot(void);
 
 extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+                                                                                       TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);