From d851bef2d60ff9345249ff67c053e37fe4b364cc Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Mon, 5 Sep 2016 09:44:38 +0100 Subject: [PATCH] Dirty replication slots when using sql interface When pg_logical_slot_get_changes(...) sets confirmed_flush_lsn to the point at which replay stopped, it doesn't dirty the replication slot. So if the replay didn't cause restart_lsn or catalog_xmin to change as well, this change will not get written out to disk. Even on a clean shutdown. If Pg crashes or restarts, a subsequent pg_logical_slot_get_changes(...) call will see the same changes already replayed since it uses the slot's confirmed_flush_lsn as the start point for fetching changes. The caller can't specify a start LSN when using the SQL interface. Mark the slot as dirty after reading changes using the SQL interface so that users won't see repeated changes after a clean shutdown. Repeated changes still occur when using the walsender interface or after an unclean shutdown. Craig Ringer --- .../replication/logical/logicalfuncs.c | 15 +++++++ src/test/recovery/Makefile | 2 + src/test/recovery/t/006_logical_decoding.pl | 40 +++++++++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 src/test/recovery/t/006_logical_decoding.pl diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 4e4c8cdaeb..9c7be2dc7b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -321,7 +321,22 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * business..) */ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) + { LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); + /* + * If only the confirmed_flush_lsn has changed the slot won't get + * marked as dirty by the above. Callers on the walsender interface + * are expected to keep track of their own progress and don't need + * it written out. But SQL-interface users cannot specify their own + * start positions and it's harder for them to keep track of their + * progress, so we should make more of an effort to save it for them. + * + * Dirty the slot so it's written out at the next checkpoint. We'll + * still lose its position on crash, as documented, but it's better + * than always losing the position even on clean restart. + */ + ReplicationSlotMarkDirty(); + } /* free context, call shutdown callback */ FreeDecodingContext(ctx); diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index 929071909a..a847952d72 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -18,3 +18,5 @@ check: clean distclean maintainer-clean: rm -rf tmp_check + +EXTRA_INSTALL = contrib/test_decoding diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl new file mode 100644 index 0000000000..b80a9a9415 --- /dev/null +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -0,0 +1,40 @@ +# Testing of logical decoding using SQL interface and/or pg_recvlogical +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1); +$node_master->append_conf( + 'postgresql.conf', qq( +max_replication_slots = 4 +wal_level = logical +)); +$node_master->start; +my $backup_name = 'master_backup'; + +$node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]); + +$node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]); + +$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]); + +# Basic decoding works +my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]); +is(scalar(split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT'); + +# If we immediately crash the server we might lose the progress we just made +# and replay the same changes again. But a clean shutdown should never repeat +# the same changes when we use the SQL decoding interface. +$node_master->restart('fast'); + +# There are no new writes, so the result should be empty. +$result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]); +chomp($result); +is($result, '', 'Decoding after fast restart repeats no rows'); + +# done with the node +$node_master->stop; -- 2.40.0