From 2dde11a632d3fe309b5af5480d01a0a3028f7f64 Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Tue, 15 Jul 2014 14:40:23 +0100 Subject: [PATCH] Reset master xmin when hot_standby_feedback disabled. If walsender has xmin of standby then ensure we reset the value to 0 when we change from hot_standby_feedback=on to hot_standby_feedback=off. --- doc/src/sgml/protocol.sgml | 5 ++- src/backend/replication/walreceiver.c | 45 ++++++++++++++++++++------- src/backend/replication/walsender.c | 5 ++- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3d72a162eb..08c2f00893 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1617,7 +1617,10 @@ The commands accepted in walsender mode are: - The standby's current xmin. + The standby's current xmin. This may be 0, if the standby is + sending notification that Hot Standby feedback will no + longer be sent on this connection. Later non-zero messages may + reinitiate the feedback mechanism. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7743422ab9..753316e8c3 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -38,6 +38,7 @@ #include #include +#include "access/transam.h" #include "access/xlog_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -123,7 +124,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(void); -static void XLogWalRcvSendHSFeedback(void); +static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ @@ -312,6 +313,7 @@ WalReceiverMain(void) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + XLogWalRcvSendHSFeedback(true); } /* Wait a while for data to arrive */ @@ -340,7 +342,7 @@ WalReceiverMain(void) * master anyway, to report any progress in applying WAL. */ XLogWalRcvSendReply(); - XLogWalRcvSendHSFeedback(); + XLogWalRcvSendHSFeedback(false); } } } @@ -621,7 +623,7 @@ XLogWalRcvFlush(bool dying) if (!dying) { XLogWalRcvSendReply(); - XLogWalRcvSendHSFeedback(); + XLogWalRcvSendHSFeedback(false); } } } @@ -681,45 +683,62 @@ XLogWalRcvSendReply(void) /* * Send hot standby feedback message to primary, plus the current time, * in case they don't have a watch. + * + * If the user disables feedback, send one final message to tell sender + * to forget about the xmin on this standby. */ static void -XLogWalRcvSendHSFeedback(void) +XLogWalRcvSendHSFeedback(bool immed) { char buf[sizeof(StandbyHSFeedbackMessage) + 1]; TimestampTz now; TransactionId nextXid; uint32 nextEpoch; TransactionId xmin; + static TimestampTz sendTime = 0; + static bool master_has_standby_xmin = false; /* * If the user doesn't want status to be reported to the master, be sure * to exit before doing anything at all. */ - if (wal_receiver_status_interval <= 0 || !hot_standby_feedback) + if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && + !master_has_standby_xmin) return; /* Get current timestamp. */ now = GetCurrentTimestamp(); - /* - * Send feedback at most once per wal_receiver_status_interval. - */ - if (!TimestampDifferenceExceeds(feedback_message.sendTime, now, + if (!immed) + { + /* + * Send feedback at most once per wal_receiver_status_interval. + */ + if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) - return; + return; + } + + sendTime = now; /* * If Hot Standby is not yet active there is nothing to send. Check this * after the interval has expired to reduce number of calls. */ if (!HotStandbyActive()) + { + Assert(!master_has_standby_xmin); return; + } /* * Make the expensive call to get the oldest xmin once we are certain * everything else has been checked. */ - xmin = GetOldestXmin(true, false); + if (hot_standby_feedback) + xmin = GetOldestXmin(true, false); + else + xmin = InvalidTransactionId; /* * Get epoch and adjust if nextXid and oldestXmin are different sides of @@ -744,6 +763,10 @@ XLogWalRcvSendHSFeedback(void) buf[0] = 'h'; memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); + if (TransactionIdIsValid(xmin)) + master_has_standby_xmin = true; + else + master_has_standby_xmin = false; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c972a1ab98..e73982a6fd 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -656,9 +656,12 @@ ProcessStandbyHSFeedbackMessage(void) reply.xmin, reply.epoch); - /* Ignore invalid xmin (can't actually happen with current walreceiver) */ + /* Unset WalSender's xmin if the feedback message value is invalid */ if (!TransactionIdIsNormal(reply.xmin)) + { + MyPgXact->xmin = InvalidTransactionId; return; + } /* * Check that the provided xmin/epoch are sane, that is, not in the future -- 2.40.0