From: Michael Paquier Date: Thu, 12 Jul 2018 01:19:51 +0000 (+0900) Subject: Make logical WAL sender report streaming state appropriately X-Git-Tag: REL_11_BETA3~89 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=0414ac6a1eb2e457c8019c5a558bd72b37dede89;p=postgresql Make logical WAL sender report streaming state appropriately WAL senders sending logically-decoded data fail to properly report in "streaming" state when starting up, hence as long as one extra record is not replayed, such WAL senders would remain in a "catchup" state, which is inconsistent with the physical cousin. This can be easily reproduced by for example using pg_recvlogical and restarting the upstream server. The TAP tests have been slightly modified to detect the failure and strengthened so as future tests also make sure that a node is in streaming state when waiting for its catchup. Backpatch down to 9.4 where this code has been introduced. Reported-by: Sawada Masahiko Author: Simon Riggs, Sawada Masahiko Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com --- diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca6bc..3a0106bc93 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data) if (MyWalSnd->state == WALSNDSTATE_CATCHUP) { ereport(DEBUG1, - (errmsg("standby \"%s\" has now caught up with primary", + (errmsg("\"%s\" has now caught up with upstream server", application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } @@ -2758,10 +2758,10 @@ XLogSendLogical(void) char *errm; /* - * Don't know whether we've caught up yet. We'll set it to true in - * WalSndWaitForWal, if we're actually waiting. We also set to true if - * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - - * i.e. when we're shutting down. + * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to + * true in WalSndWaitForWal, if we're actually waiting. We also set to + * true if XLogReadRecord() had to stop reading but WalSndWaitForWal + * didn't wait - i.e. when we're shutting down. */ WalSndCaughtUp = false; @@ -2774,6 +2774,9 @@ XLogSendLogical(void) if (record != NULL) { + /* XXX: Note that logical decoding cannot be used while in recovery */ + XLogRecPtr flushPtr = GetFlushRecPtr(); + /* * Note the lack of any call to LagTrackerWrite() which is handled by * WalSndUpdateProgress which is called by output plugin through @@ -2782,6 +2785,13 @@ XLogSendLogical(void) LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; + + /* + * If we have sent a record that is at or beyond the flushed point, we + * have caught up. + */ + if (sentPtr >= flushPtr) + WalSndCaughtUp = true; } else { diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 5f848a0db7..e60673ad72 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1525,7 +1525,8 @@ also works for logical subscriptions) until its replication location in pg_stat_replication equals or passes the upstream's WAL insert point at the time this function is called. By default the replay_lsn is waited for, but 'mode' may be specified to wait for any of -sent|write|flush|replay. +sent|write|flush|replay. The connection catching up must be in a streaming +state. If there is no active replication connection from this peer, waits until poll_query_until timeout. @@ -1570,7 +1571,7 @@ sub wait_for_catchup . $lsn_expr . " on " . $self->name . "\n"; my $query = - qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]; + qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]; $self->poll_query_until('postgres', $query) or croak "timed out waiting for catchup"; print "done\n"; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 503556fd6c..d94458e00e 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -188,6 +188,11 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); +# Restart the publisher and check the state of the subscriber which +# should be in a streaming state after catching up. +$node_publisher->stop('fast'); +$node_publisher->start; + $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres',