Logical messages, added in
3fe3511d05, during decoding failed to filter
messages emitted in other databases and messages emitted "under" a
replication origin the output plugin isn't interested in.
Add tests to verify that both types of filtering actually work. While
touching message.sql remove hunk obsoleted by
d25379e.
Bump XLOG_PAGE_MAGIC because xl_logical_message changed and because
3fe3511d05 had omitted doing so.
3fe3511d05 additionally didn't bump
catversion, but
7a542700d has done so since.
Author: Petr Jelinek
Reported-By: Andres Freund
Discussion:
20160406142513.wotqy3ba3kanr423@alap3.anarazel.de
-- predictability
SET synchronous_commit = on;
-SET client_encoding = 'utf8';
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
message: transactional: 1 prefix: test, sz: 11 content:czechtastic
(7 rows)
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
?column?
----------
- init
+ otherdb1
+(1 row)
+
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+ ?column?
+----------
+ otherdb2
+(1 row)
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
+ ?column?
+----------
+ cleanup
(1 row)
-- ensure we prevent duplicate setup
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
ERROR: cannot setup replication origin when one is already setup
+SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+ ?column?
+----------
+
+(1 row)
+
BEGIN;
-- setup transaction origin
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
-- predictability
SET synchronous_commit = on;
-SET client_encoding = 'utf8';
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
-- ensure we prevent duplicate setup
SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+
BEGIN;
-- setup transaction origin
SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
}
}
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
XLogReaderState *r = buf->record;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot;
xl_logical_message *message;
message = (xl_logical_message *) XLogRecGetData(r);
+ if (message->dbId != ctx->slot->data.database ||
+ FilterByOrigin(ctx, origin_id))
+ return;
+
if (message->transactional &&
!SnapBuildProcessChange(builder, xid, buf->origptr))
return;
message->message + message->prefix_size);
}
-static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
-{
- if (ctx->callbacks.filter_by_origin_cb == NULL)
- return false;
-
- return filter_by_origin_cb_wrapper(ctx, origin_id);
-}
-
/*
* Consolidated commit record handling between the different form of commit
* records.
#include "postgres.h"
+#include "miscadmin.h"
+
#include "access/xact.h"
#include "catalog/indexing.h"
GetCurrentTransactionId();
}
+ xlrec.dbId = MyDatabaseId;
xlrec.transactional = transactional;
xlrec.prefix_size = strlen(prefix) + 1;
xlrec.message_size = size;
XLogRegisterData((char *) prefix, xlrec.prefix_size);
XLogRegisterData((char *) message, size);
+ /* allow origin filtering */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
}
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD091 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
*/
typedef struct xl_logical_message
{
+ Oid dbId; /* database Oid emitted from */
bool transactional; /* is message transactional? */
Size prefix_size; /* length of prefix */
Size message_size; /* size of the message */