From: Andres Freund Date: Thu, 14 Apr 2016 00:38:54 +0000 (-0700) Subject: Add required database and origin filtering for logical messages. X-Git-Tag: REL9_6_BETA1~167 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=be65eddd80093a923b091dc60776aa6f966d1f07;p=postgresql Add required database and origin filtering for logical messages. Logical messages, added in 3fe3511d05, during decoding failed to filter messages emitted in other databases and messages emitted "under" a replication origin the output plugin isn't interested in. Add tests to verify that both types of filtering actually work. While touching message.sql remove hunk obsoleted by d25379e. Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because 3fe3511d05 had omitted doing so. 3fe3511d05 additionally didn't bump catversion, but 7a542700d has done so since. Author: Petr Jelinek Reported-By: Andres Freund Discussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de --- diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index 70130e9378..c75d40190b 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -1,6 +1,5 @@ -- predictability SET synchronous_commit = on; -SET client_encoding = 'utf8'; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); ?column? ---------- @@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for message: transactional: 1 prefix: test, sz: 11 content:czechtastic (7 rows) -SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); +-- test db filtering +\set prevdb :DBNAME +\c template1 +SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1'); ?column? ---------- - init + otherdb1 +(1 row) + +SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2'); + ?column? +---------- + otherdb2 +(1 row) + +\c :prevdb +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + cleanup (1 row) diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out index c0f512579c..76d4ea986d 100644 --- a/contrib/test_decoding/expected/replorigin.out +++ b/contrib/test_decoding/expected/replorigin.out @@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); -- ensure we prevent duplicate setup SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); ERROR: cannot setup replication origin when one is already setup +SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded'); + ?column? +---------- + +(1 row) + BEGIN; -- setup transaction origin SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index 4aedb04560..cf3f7738e5 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -1,6 +1,5 @@ -- predictability SET synchronous_commit = on; -SET client_encoding = 'utf8'; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); @@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); -SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); +-- test db filtering +\set prevdb :DBNAME +\c template1 + +SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1'); +SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2'); + +\c :prevdb +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + +SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql index e12404e106..7870f0ea32 100644 --- a/contrib/test_decoding/sql/replorigin.sql +++ b/contrib/test_decoding/sql/replorigin.sql @@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); -- ensure we prevent duplicate setup SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); +SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded'); + BEGIN; -- setup transaction origin SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3e80c4a0d8..0cdb0b8a92 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +static inline bool +FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (ctx->callbacks.filter_by_origin_cb == NULL) + return false; + + return filter_by_origin_cb_wrapper(ctx, origin_id); +} + /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ @@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + RepOriginId origin_id = XLogRecGetOrigin(r); Snapshot snapshot; xl_logical_message *message; @@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message = (xl_logical_message *) XLogRecGetData(r); + if (message->dbId != ctx->slot->data.database || + FilterByOrigin(ctx, origin_id)) + return; + if (message->transactional && !SnapBuildProcessChange(builder, xid, buf->origptr)) return; @@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message->message + message->prefix_size); } -static inline bool -FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) -{ - if (ctx->callbacks.filter_by_origin_cb == NULL) - return false; - - return filter_by_origin_cb_wrapper(ctx, origin_id); -} - /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 684f799826..efcc25ae95 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -31,6 +31,8 @@ #include "postgres.h" +#include "miscadmin.h" + #include "access/xact.h" #include "catalog/indexing.h" @@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, GetCurrentTransactionId(); } + xlrec.dbId = MyDatabaseId; xlrec.transactional = transactional; xlrec.prefix_size = strlen(prefix) + 1; xlrec.message_size = size; @@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, XLogRegisterData((char *) prefix, xlrec.prefix_size); XLogRegisterData((char *) message, size); + /* allow origin filtering */ + XLogIncludeOrigin(); + return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); } diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 749060f9c7..7089a1c48f 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -31,7 +31,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD091 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/replication/message.h b/src/include/replication/message.h index 8b968d5288..23b9cdb268 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -19,6 +19,7 @@ */ typedef struct xl_logical_message { + Oid dbId; /* database Oid emitted from */ bool transactional; /* is message transactional? */ Size prefix_size; /* length of prefix */ Size message_size; /* size of the message */