]> granicus.if.org Git - postgresql/commitdiff
Fix logical replication slot initialization
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 1 Aug 2018 21:39:07 +0000 (17:39 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 1 Aug 2018 21:47:15 +0000 (17:47 -0400)
This was broken in commit 9c7d06d60680, which inadvertently gave the
wrong value to fast_forward in one StartupDecodingContext call.  Fix by
flipping the value.  Add a test for the obvious error, namely trying to
initialize a replication slot with an nonexistent output plugin.

While at it, move the CreateDecodingContext call earlier, so that any
errors are reported before sending the CopyBoth message.

Author: Dave Cramer <davecramer@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CADK3HHLVkeRe1v4P02-5hj55H3_yJg3AEtpXyEY5T3wuzO2jSg@mail.gmail.com

contrib/test_decoding/expected/slot.out
contrib/test_decoding/sql/slot.sql
src/backend/replication/logical/logical.c
src/backend/replication/walsender.c

index 2737a8a301bfbe7fb6517bc3b9f9a91553e5e463..523621a705db0b4bff9d50fda029562ed43d6272 100644 (file)
@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes
  init
 (1 row)
 
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+ERROR:  could not access file "nonexistent": No such file or directory
 -- here we want to start a new session and wait till old one is gone
 select pg_backend_pid() as oldpid \gset
 \c -
index 24cdf7155d750a850adf7539002f6988a940687b..c8d08f85417e703f480297b4bac48ef5da55fd53 100644 (file)
@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
 
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+
 -- here we want to start a new session and wait till old one is gone
 select pg_backend_pid() as oldpid \gset
 \c -
index 3cd4eefb9bf776c8cfb06fa311e23d371011d76d..bb83fc9d42dcc1e298614556391366502752075c 100644 (file)
@@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin,
        ReplicationSlotSave();
 
        ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-                                                                need_full_snapshot, true,
+                                                                need_full_snapshot, false,
                                                                 read_page, prepare_write, do_write,
                                                                 update_progress);
 
index d60026dfd1a0c3ccce5e5a190813274eb236024c..c83ff3b573297925db720027a70ec999e2c900a0 100644 (file)
@@ -1068,6 +1068,19 @@ StartLogicalReplication(StartReplicationCmd *cmd)
                got_STOPPING = true;
        }
 
+       /*
+        * Create our decoding context, making it start at the previously ack'ed
+        * position.
+        *
+        * Do this before sending CopyBoth, so that any errors are reported early.
+        */
+       logical_decoding_ctx =
+               CreateDecodingContext(cmd->startpoint, cmd->options, false,
+                                                         logical_read_xlog_page,
+                                                         WalSndPrepareWrite, WalSndWriteData,
+                                                         WalSndUpdateProgress);
+
+
        WalSndSetState(WALSNDSTATE_CATCHUP);
 
        /* Send a CopyBothResponse message, and start streaming */
@@ -1077,16 +1090,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
        pq_endmessage(&buf);
        pq_flush();
 
-       /*
-        * Initialize position to the last ack'ed one, then the xlog records begin
-        * to be shipped from that position.
-        */
-       logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
-                                                                                                false,
-                                                                                                logical_read_xlog_page,
-                                                                                                WalSndPrepareWrite,
-                                                                                                WalSndWriteData,
-                                                                                                WalSndUpdateProgress);
 
        /* Start reading WAL from the oldest required WAL. */
        logical_startptr = MyReplicationSlot->data.restart_lsn;