]> granicus.if.org Git - postgresql/commitdiff
Make logical WAL sender report streaming state appropriately
authorMichael Paquier <michael@paquier.xyz>
Thu, 12 Jul 2018 01:19:51 +0000 (10:19 +0900)
committerMichael Paquier <michael@paquier.xyz>
Thu, 12 Jul 2018 01:19:51 +0000 (10:19 +0900)
WAL senders sending logically-decoded data fail to properly report in
"streaming" state when starting up, hence as long as one extra record is
not replayed, such WAL senders would remain in a "catchup" state, which
is inconsistent with the physical cousin.

This can be easily reproduced by for example using pg_recvlogical and
restarting the upstream server.  The TAP tests have been slightly
modified to detect the failure and strengthened so as future tests also
make sure that a node is in streaming state when waiting for its
catchup.

Backpatch down to 9.4 where this code has been introduced.

Reported-by: Sawada Masahiko
Author: Simon Riggs, Sawada Masahiko
Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran
Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com

src/backend/replication/walsender.c
src/test/perl/PostgresNode.pm
src/test/subscription/t/001_rep_changes.pl

index e47ddca6bca2c332d2dfabb3bded19fd16823445..3a0106bc9337affd7ff576e1e435211068c0a599 100644 (file)
@@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
                        if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
                        {
                                ereport(DEBUG1,
-                                               (errmsg("standby \"%s\" has now caught up with primary",
+                                               (errmsg("\"%s\" has now caught up with upstream server",
                                                                application_name)));
                                WalSndSetState(WALSNDSTATE_STREAMING);
                        }
@@ -2758,10 +2758,10 @@ XLogSendLogical(void)
        char       *errm;
 
        /*
-        * Don't know whether we've caught up yet. We'll set it to true in
-        * WalSndWaitForWal, if we're actually waiting. We also set to true if
-        * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
-        * i.e. when we're shutting down.
+        * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
+        * true in WalSndWaitForWal, if we're actually waiting. We also set to
+        * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
+        * didn't wait - i.e. when we're shutting down.
         */
        WalSndCaughtUp = false;
 
@@ -2774,6 +2774,9 @@ XLogSendLogical(void)
 
        if (record != NULL)
        {
+               /* XXX: Note that logical decoding cannot be used while in recovery */
+               XLogRecPtr      flushPtr = GetFlushRecPtr();
+
                /*
                 * Note the lack of any call to LagTrackerWrite() which is handled by
                 * WalSndUpdateProgress which is called by output plugin through
@@ -2782,6 +2785,13 @@ XLogSendLogical(void)
                LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
                sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+
+               /*
+                * If we have sent a record that is at or beyond the flushed point, we
+                * have caught up.
+                */
+               if (sentPtr >= flushPtr)
+                       WalSndCaughtUp = true;
        }
        else
        {
index 5f848a0db7a285946764124547ed705db1198570..e60673ad723da07706ad842f3787f272910ff052 100644 (file)
@@ -1525,7 +1525,8 @@ also works for logical subscriptions)
 until its replication location in pg_stat_replication equals or passes the
 upstream's WAL insert point at the time this function is called. By default
 the replay_lsn is waited for, but 'mode' may be specified to wait for any of
-sent|write|flush|replay.
+sent|write|flush|replay. The connection catching up must be in a streaming
+state.
 
 If there is no active replication connection from this peer, waits until
 poll_query_until timeout.
@@ -1570,7 +1571,7 @@ sub wait_for_catchup
          . $lsn_expr . " on "
          . $self->name . "\n";
        my $query =
-         qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+         qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
        $self->poll_query_until('postgres', $query)
          or croak "timed out waiting for catchup";
        print "done\n";
index 503556fd6cd9146788f1ef9bd5213b7adee96e51..d94458e00e1d216ae082c7a6d4d06628f97db107 100644 (file)
@@ -188,6 +188,11 @@ $node_publisher->safe_psql('postgres',
        "INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+# Restart the publisher and check the state of the subscriber which
+# should be in a streaming state after catching up.
+$node_publisher->stop('fast');
+$node_publisher->start;
+
 $node_publisher->wait_for_catchup($appname);
 
 $result = $node_subscriber->safe_psql('postgres',