$(MAKE) -C $(top_builddir)/contrib/test_decoding
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
- decoding_into_rel binary prepared replorigin time
+ decoding_into_rel binary prepared replorigin time messages
regresscheck: | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
(7 rows)
/*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
*/
BEGIN;
CREATE TABLE tr_etoomuch (id serial primary key, data int);
INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+ ?column?
+----------------
+ tx logical msg
+(1 row)
+
DELETE FROM tr_etoomuch WHERE id < 5000;
UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
COMMIT;
FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
GROUP BY substring(data, 1, 24)
ORDER BY 1,2;
- count | min | max
--------+-------------------------------------------------+------------------------------------------------------------------------
- 1 | BEGIN | BEGIN
- 1 | COMMIT | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
-(3 rows)
+ count | min | max
+-------+-----------------------------------------------------------------------+------------------------------------------------------------------------
+ 1 | BEGIN | BEGIN
+ 1 | COMMIT | COMMIT
+ 1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg
+ 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
+(4 rows)
-- check updates of primary keys work correctly
BEGIN;
--- /dev/null
+-- predictability
+SET synchronous_commit = on;
+SET client_encoding = 'utf8';
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+ ?column?
+----------
+ msg1
+(1 row)
+
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+ ?column?
+----------
+ msg2
+(1 row)
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+ ?column?
+----------
+ msg3
+(1 row)
+
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ ?column?
+----------
+ msg4
+(1 row)
+
+ROLLBACK;
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+ ?column?
+----------
+ msg5
+(1 row)
+
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+ ?column?
+----------
+ msg6
+(1 row)
+
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+ ?column?
+----------
+ msg7
+(1 row)
+
+COMMIT;
+SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň');
+ ?column?
+---------------
+ žluťoučký kůň
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------------------------
+ message: transactional: 1 prefix: test, sz: 4 content:msg1
+ message: transactional: 0 prefix: test, sz: 4 content:msg2
+ message: transactional: 0 prefix: test, sz: 4 content:msg4
+ message: transactional: 0 prefix: test, sz: 4 content:msg6
+ message: transactional: 1 prefix: test, sz: 4 content:msg5
+ message: transactional: 1 prefix: test, sz: 4 content:msg7
+ message: transactional: 1 prefix: test, sz: 19 content:žluťoučký kůň
+(7 rows)
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+ ?column?
+----------
+ init
+(1 row)
+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
*/
BEGIN;
CREATE TABLE tr_etoomuch (id serial primary key, data int);
INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
DELETE FROM tr_etoomuch WHERE id < 5000;
UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
COMMIT;
--- /dev/null
+-- predictability
+SET synchronous_commit = on;
+SET client_encoding = 'utf8';
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ROLLBACK;
+
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+COMMIT;
+
+SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň');
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
#include "replication/output_plugin.h"
#include "replication/logical.h"
+#include "replication/message.h"
#include "replication/origin.h"
#include "utils/builtins.h"
ReorderBufferChange *change);
static bool pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
void
_PG_init(void)
cb->commit_cb = pg_decode_commit_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
+ cb->message_cb = pg_decode_message;
}
OutputPluginWrite(ctx, true);
}
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ const char *prefix, Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
+ transactional, prefix, sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ OutputPluginWrite(ctx, true);
+}
</entry>
</row>
+ <row>
+ <entry id="pg-logical-emit-message-text">
+ <indexterm>
+ <primary>pg_logical_emit_message</primary>
+ </indexterm>
+ <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
+ </entry>
+ <entry>
+ void
+ </entry>
+ <entry>
+ Emit text logical decoding message. This can be used to pass generic
+ messages to logical decoding plugins through WAL. The parameter
+ <parameter>transactional</parameter> specifies if the message should
+ be part of current transaction or if it should be written immediately
+ and decoded as soon as the logical decoding reads the record. The
+ <parameter>prefix</parameter> is textual prefix used by the logical
+ decoding plugins to easily recognize interesting messages for them.
+ The <parameter>content</parameter> is the text of the message.
+ </entry>
+ </row>
+
+ <row>
+ <entry id="pg-logical-emit-message-bytea">
+ <indexterm>
+ <primary>>pg_logical_emit_message</primary>
+ </indexterm>
+ <literal><function>>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type>)</function></literal>
+ </entry>
+ <entry>
+ void
+ </entry>
+ <entry>
+ Emit binary logical decoding message. This can be used to pass generic
+ messages to logical decoding plugins through WAL. The parameter
+ <parameter>transactional</parameter> specifies if the message should
+ be part of current transaction or if it should be written immediately
+ and decoded as soon as the logical decoding reads the record. The
+ <parameter>prefix</parameter> is textual prefix used by the logical
+ decoding plugins to easily recognize interesting messages for them.
+ The <parameter>content</parameter> is the binary content of the
+ message.
+ </entry>
+ </row>
+
</tbody>
</tgroup>
</table>
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
more efficient.
</para>
</sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-message">
+ <title>Generic Message Callback</title>
+
+ <para>
+ The optional <function>message_cb</function> callback is called whenever
+ a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message
+);
+</programlisting>
+ The <parameter>txn</parameter> parameter contains meta information about
+ the transaction, like the time stamp at which it has been committed and
+ its XID. Note however that it can be NULL when the message is
+ non-transactional and the XID was not assigned yet in the transaction
+ which logged the message. The <parameter>lsn</parameter> has WAL
+ position of the message. The <parameter>transactional</parameter> says
+ if the message was sent as transactional or not.
+ The <parameter>prefix</parameter> is arbitrary null-terminated prefix
+ which can be used for identifying interesting messages for the current
+ plugin. And finally the <parameter>message</parameter> parameter holds
+ the actual message of <parameter>message_size</parameter> size.
+ </para>
+ <para>
+ Extra care should be taken to ensure that the prefix the output plugin
+ considers interesting is unique. Using name of the extension or the
+ output plugin itself is often a good choice.
+ </para>
+ </sect3>
+
</sect2>
<sect2 id="logicaldecoding-output-plugin-output">
include $(top_builddir)/src/Makefile.global
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
- gindesc.o gistdesc.o hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o \
- relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
- standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
+ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
+ smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * logicalmsgdesc.c
+ * rmgr descriptor routines for replication/logical/message.c
+ *
+ * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/message.h"
+
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_LOGICAL_MESSAGE)
+ {
+ xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+ appendStringInfo(buf, "%s message size %zu bytes",
+ xlrec->transactional ? "transactional" : "nontransactional",
+ xlrec->message_size);
+ }
+}
+
+const char *
+logicalmsg_identify(uint8 info)
+{
+ if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
+ return "MESSAGE";
+
+ return NULL;
+}
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "replication/message.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
snapbuild.o
include $(top_srcdir)/src/backend/common.mk
#include "replication/decode.h"
#include "replication/logical.h"
+#include "replication/message.h"
#include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
DecodeHeapOp(ctx, &buf);
break;
+ case RM_LOGICALMSG_ID:
+ DecodeLogicalMsgOp(ctx, &buf);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
}
}
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ Snapshot snapshot;
+ xl_logical_message *message;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ /* No point in doing anything yet. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ return;
+
+ message = (xl_logical_message *) XLogRecGetData(r);
+
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!message->transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
+ message->transactional,
+ message->message, /* first part of message is prefix */
+ message->message_size,
+ message->message + message->prefix_size);
+}
+
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
+ ctx->reorder->message = message_cb_wrapper;
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
return ret;
}
+static void
+message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ if (ctx->callbacks.message_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "message";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
+ message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
+#include "access/xact.h"
+
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
+#include "replication/message.h"
#include "storage/fd.h"
{
return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}
+
+
+/*
+ * SQL function for writing logical decding message into WAL.
+ */
+Datum
+pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
+{
+ bool transactional = PG_GETARG_BOOL(0);
+ char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ bytea *data = PG_GETARG_BYTEA_PP(2);
+ XLogRecPtr lsn;
+
+ lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
+ transactional);
+ PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_emit_message_text(PG_FUNCTION_ARGS)
+{
+ /* bytea and text are compatible */
+ return pg_logical_emit_message_bytea(fcinfo);
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ * Generic logical messages.
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will be delivered always.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/message.h"
+#include "replication/logical.h"
+
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+ bool transactional)
+{
+ xl_logical_message xlrec;
+
+ /*
+ * Force xid to be allocated if we're emitting a transactional message.
+ */
+ if (transactional)
+ {
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+ }
+
+ xlrec.transactional = transactional;
+ xlrec.prefix_size = strlen(prefix) + 1;
+ xlrec.message_size = size;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+ XLogRegisterData((char *) prefix, xlrec.prefix_size);
+ XLogRegisterData((char *) message, size);
+
+ return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+ /* This is only interesting for logical decoding, see decode.c. */
+}
change->data.tp.oldtuple = NULL;
}
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ if (change->data.msg.prefix != NULL)
+ pfree(change->data.msg.prefix);
+ change->data.msg.prefix = NULL;
+ if (change->data.msg.message != NULL)
+ pfree(change->data.msg.message);
+ change->data.msg.message = NULL;
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
if (change->data.snapshot)
{
ReorderBufferCheckSerializeTXN(rb, txn);
}
+/*
+ * Queue message into a transaction so it can be processed upon commit.
+ */
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message)
+{
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ Assert(xid != InvalidTransactionId);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+ change->data.msg.prefix = pstrdup(prefix);
+ change->data.msg.message_size = message_size;
+ change->data.msg.message = palloc(message_size);
+ memcpy(change->data.msg.message, message, message_size);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+ PG_TRY();
+ {
+ rb->message(rb, txn, lsn, false, prefix, message_size, message);
+
+ TeardownHistoricSnapshot(false);
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
+
static void
AssertTXNLsnOrder(ReorderBuffer *rb)
{
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ rb->message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
/* get rid of the old */
TeardownHistoricSnapshot(false);
memcpy(data, newtup->tuple.t_data, newlen);
data += newlen;
}
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ char *data;
+ Size prefix_size = strlen(change->data.msg.prefix) + 1;
+
+ sz += prefix_size + change->data.msg.message_size +
+ sizeof(Size) + sizeof(Size);
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+ /* write the prefix including the size */
+ memcpy(data, &prefix_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.msg.prefix,
+ prefix_size);
+ data += prefix_size;
+
+ /* write the message including the size */
+ memcpy(data, &change->data.msg.message_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.msg.message,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
}
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ Size prefix_size;
+
+ /* read prefix */
+ memcpy(&prefix_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.prefix = MemoryContextAlloc(rb->context,
+ prefix_size);
+ memcpy(change->data.msg.prefix, data, prefix_size);
+ Assert(change->data.msg.prefix[prefix_size-1] == '\0');
+ data += prefix_size;
+
+ /* read the messsage */
+ memcpy(&change->data.msg.message_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.message = MemoryContextAlloc(rb->context,
+ change->data.msg.message_size);
+ memcpy(change->data.msg.message, data,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot oldsnap;
return snapname;
}
+/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+ Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+ /* only build a new snapshot if we don't have a prebuilt one */
+ if (builder->snapshot == NULL)
+ {
+ builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ /* inrease refcount for the snapshot builder */
+ SnapBuildSnapIncRefcount(builder->snapshot);
+ }
+
+ return builder->snapshot;
+}
+
/*
* Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
* any. Aborts the previously started transaction and resets the resource
/pg_xlogdump
# Source files copied from src/backend/access/rmgrdesc/
-/brindesc.c
-/clogdesc.c
-/committsdesc.c
-/dbasedesc.c
-/genericdesc.c
-/gindesc.c
-/gistdesc.c
-/hashdesc.c
-/heapdesc.c
-/mxactdesc.c
-/nbtdesc.c
-/relmapdesc.c
-/replorigindesc.c
-/seqdesc.c
-/smgrdesc.c
-/spgdesc.c
-/standbydesc.c
-/tblspcdesc.c
-/xactdesc.c
-/xlogdesc.c
+/*desc.c
/xlogreader.c
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "replication/message.h"
#include "replication/origin.h"
#include "rmgrdesc.h"
#include "storage/standbydefs.h"
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
DESCR("peek at changes from replication slot");
DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
+DESCR("emit a textual logical decoding message");
+DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
+DESCR("emit a binary logical decoding message");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS);
#endif
--- /dev/null
+/*-------------------------------------------------------------------------
+ * message.h
+ * Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+ bool transactional; /* is message transactional? */
+ Size prefix_size; /* length of prefix */
+ Size message_size; /* size of the message */
+ char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null
+ * terminated prefix of length
+ * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+ size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE 0x00
+void logicalmsg_redo(XLogReaderState *record);
+void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif /* PG_LOGICAL_MESSAGE_H */
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
/*
* Filter changes by origin.
*/
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_MESSAGE,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
ReorderBufferTupleBuf *newtuple;
} tp;
+ /* Message with arbitrary data. */
+ struct
+ {
+ char *prefix;
+ Size message_size;
+ char *message;
+ } msg;
+
/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
Snapshot snapshot;
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
struct ReorderBuffer
{
/*
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferCommitCB commit;
+ ReorderBufferMessageCB message;
/*
* Pointer that will be passed untouched to the callbacks.
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
extern void SnapBuildClearExportedSnapshot(void);
extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+ TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);