]> granicus.if.org Git - postgresql/commitdiff
Add required database and origin filtering for logical messages.
authorAndres Freund <andres@anarazel.de>
Thu, 14 Apr 2016 00:38:54 +0000 (17:38 -0700)
committerAndres Freund <andres@anarazel.de>
Thu, 14 Apr 2016 00:38:54 +0000 (17:38 -0700)
Logical messages, added in 3fe3511d05, during decoding failed to filter
messages emitted in other databases and messages emitted "under" a
replication origin the output plugin isn't interested in.

Add tests to verify that both types of filtering actually work. While
touching message.sql remove hunk obsoleted by d25379e.

Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because
3fe3511d05 had omitted doing so. 3fe3511d05 additionally didn't bump
catversion, but 7a542700d has done so since.

Author: Petr Jelinek
Reported-By: Andres Freund
Discussion: 20160406142513.wotqy3ba3kanr423@alap3.anarazel.de

contrib/test_decoding/expected/messages.out
contrib/test_decoding/expected/replorigin.out
contrib/test_decoding/sql/messages.sql
contrib/test_decoding/sql/replorigin.sql
src/backend/replication/logical/decode.c
src/backend/replication/logical/message.c
src/include/access/xlog_internal.h
src/include/replication/message.h

index 70130e93789887cc84028cc0b732a96324816eb2..c75d40190b6e990757babeefc8e4d6450b2bb0f2 100644 (file)
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
  ?column? 
 ----------
@@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
 (7 rows)
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
  ?column? 
 ----------
- init
+ otherdb1
+(1 row)
+
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+ ?column? 
+----------
+ otherdb2
+(1 row)
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ cleanup
 (1 row)
 
index c0f512579ca90bed5becf46e448780a32642dd52..76d4ea986dbc0f9e8a4f88859c1136298c68ef50 100644 (file)
@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 ERROR:  cannot setup replication origin when one is already setup
+SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+ ?column? 
+----------
+(1 row)
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
index 4aedb045601d7441bf267a56d4a4aa87f2aa76d5..cf3f7738e57880ece0773269ae878e00b20ebe93 100644 (file)
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
@@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
index e12404e106ed61d48b78ddff2f807b2b3c699967..7870f0ea32199dff333b62762e824acf111bfa21 100644 (file)
@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 
+SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
index 3e80c4a0d86efe8a86a843cafe90d7d0cd414248..0cdb0b8a92b73945ff19cdb68dea0c0d01e3b433 100644 (file)
@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        }
 }
 
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+       if (ctx->callbacks.filter_by_origin_cb == NULL)
+               return false;
+
+       return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        XLogReaderState *r = buf->record;
        TransactionId   xid = XLogRecGetXid(r);
        uint8                   info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+       RepOriginId             origin_id = XLogRecGetOrigin(r);
        Snapshot                snapshot;
        xl_logical_message *message;
 
@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
        message = (xl_logical_message *) XLogRecGetData(r);
 
+       if (message->dbId != ctx->slot->data.database ||
+               FilterByOrigin(ctx, origin_id))
+               return;
+
        if (message->transactional &&
                !SnapBuildProcessChange(builder, xid, buf->origptr))
                return;
@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                                          message->message + message->prefix_size);
 }
 
-static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
-{
-       if (ctx->callbacks.filter_by_origin_cb == NULL)
-               return false;
-
-       return filter_by_origin_cb_wrapper(ctx, origin_id);
-}
-
 /*
  * Consolidated commit record handling between the different form of commit
  * records.
index 684f7998263519fd2806a5e7c2c4897cedcaa412..efcc25ae9575dfd069becebf0a7c4e7bf7d617bf 100644 (file)
@@ -31,6 +31,8 @@
 
 #include "postgres.h"
 
+#include "miscadmin.h"
+
 #include "access/xact.h"
 
 #include "catalog/indexing.h"
@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
                GetCurrentTransactionId();
        }
 
+       xlrec.dbId = MyDatabaseId;
        xlrec.transactional = transactional;
        xlrec.prefix_size = strlen(prefix) + 1;
        xlrec.message_size = size;
@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
        XLogRegisterData((char *) prefix, xlrec.prefix_size);
        XLogRegisterData((char *) message, size);
 
+       /* allow origin filtering */
+       XLogIncludeOrigin();
+
        return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
 }
 
index 749060f9c75a25c38e77c35384c520029ead6028..7089a1c48f79f7e95033076a668944b679dca529 100644 (file)
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD091 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
index 8b968d5288efe4e17aae2215ad223f0447b87ef0..23b9cdb268bd65c52348565320dac60a1920c305 100644 (file)
@@ -19,6 +19,7 @@
  */
 typedef struct xl_logical_message
 {
+       Oid         dbId;                                                       /* database Oid emitted from */
        bool            transactional;                                  /* is message transactional? */
        Size            prefix_size;                                    /* length of prefix */
        Size            message_size;                                   /* size of the message */