if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
ereport(DEBUG1,
- (errmsg("standby \"%s\" has now caught up with primary",
+ (errmsg("\"%s\" has now caught up with upstream server",
application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
}
char *errm;
/*
- * Don't know whether we've caught up yet. We'll set it to true in
- * WalSndWaitForWal, if we're actually waiting. We also set to true if
- * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
- * i.e. when we're shutting down.
+ * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
+ * true in WalSndWaitForWal, if we're actually waiting. We also set to
+ * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
+ * didn't wait - i.e. when we're shutting down.
*/
WalSndCaughtUp = false;
if (record != NULL)
{
+ /* XXX: Note that logical decoding cannot be used while in recovery */
+ XLogRecPtr flushPtr = GetFlushRecPtr();
+
/*
* Note the lack of any call to LagTrackerWrite() which is handled by
* WalSndUpdateProgress which is called by output plugin through
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+
+ /*
+ * If we have sent a record that is at or beyond the flushed point, we
+ * have caught up.
+ */
+ if (sentPtr >= flushPtr)
+ WalSndCaughtUp = true;
}
else
{
until its replication location in pg_stat_replication equals or passes the
upstream's WAL insert point at the time this function is called. By default
the replay_lsn is waited for, but 'mode' may be specified to wait for any of
-sent|write|flush|replay.
+sent|write|flush|replay. The connection catching up must be in a streaming
+state.
If there is no active replication connection from this peer, waits until
poll_query_until timeout.
. $lsn_expr . " on "
. $self->name . "\n";
my $query =
- qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+ qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
$self->poll_query_until('postgres', $query)
or croak "timed out waiting for catchup";
print "done\n";
"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
+# Restart the publisher and check the state of the subscriber which
+# should be in a streaming state after catching up.
+$node_publisher->stop('fast');
+$node_publisher->start;
+
$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',