From 6f60fdd7015b032bf49273c99f80913d57eac284 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 11 Oct 2012 17:39:52 +0300 Subject: [PATCH] Improve replication connection timeouts. Rename replication_timeout to wal_sender_timeout, and add a new setting called wal_receiver_timeout that does the same at the walreceiver side. There was previously no timeout in walreceiver, so if the network went down, for example, the walreceiver could take a long time to notice that the connection was lost. Now with the two settings, both sides of a replication connection will detect a broken connection similarly. It is no longer necessary to manually set wal_receiver_status_interval to a value smaller than the timeout. Both wal sender and receiver now automatically send a "ping" message if more than 1/2 of the configured timeout has elapsed, and it hasn't received any messages from the other end. Amit Kapila, heavily edited by me. --- doc/src/sgml/config.sgml | 36 ++++--- doc/src/sgml/release-9.1.sgml | 2 +- src/backend/replication/walreceiver.c | 94 ++++++++++++++++--- src/backend/replication/walsender.c | 88 ++++++++++------- src/backend/utils/misc/guc.c | 15 ++- src/backend/utils/misc/postgresql.conf.sample | 5 +- src/include/replication/walprotocol.h | 12 +++ src/include/replication/walreceiver.h | 2 + src/include/replication/walsender.h | 2 +- src/include/replication/walsender_private.h | 1 - 10 files changed, 190 insertions(+), 67 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 8388416430..b4fcbaf9c7 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2236,10 +2236,10 @@ include 'filename' - - replication_timeout (integer) + + wal_sender_timeout (integer) - replication_timeout configuration parameter + wal_sender_timeout configuration parameter @@ -2251,12 +2251,6 @@ include 'filename' the postgresql.conf file or on the server command line. The default value is 60 seconds. - - To prevent connections from being terminated prematurely, - - must be enabled on the standby, and its value must be less than the - value of replication_timeout. - @@ -2474,11 +2468,6 @@ include 'filename' the postgresql.conf file or on the server command line. The default value is 10 seconds. - - When is enabled on a sending server, - wal_receiver_status_interval must be enabled, and its value - must be less than the value of replication_timeout. - @@ -2507,6 +2496,25 @@ include 'filename' + + wal_receiver_timeout (integer) + + wal_receiver_timeout configuration parameter + + + + Terminate replication connections that are inactive longer + than the specified number of milliseconds. This is useful for + the receiving standby server to detect a primary node crash or network + outage. + A value of zero disables the timeout mechanism. This parameter + can only be set in + the postgresql.conf file or on the server command line. + The default value is 60 seconds. + + + + diff --git a/doc/src/sgml/release-9.1.sgml b/doc/src/sgml/release-9.1.sgml index 6bc1c8c90e..5fbdd7a195 100644 --- a/doc/src/sgml/release-9.1.sgml +++ b/doc/src/sgml/release-9.1.sgml @@ -3322,7 +3322,7 @@ Add - replication_timeout + replication_timeout setting (Fujii Masao, Heikki Linnakangas) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 869457003a..b613df4c6a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -55,6 +55,7 @@ /* GUC variables */ int wal_receiver_status_interval; +int wal_receiver_timeout; bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ @@ -121,7 +122,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(void); +static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(void); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -170,9 +171,10 @@ WalReceiverMain(void) { char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; - /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + TimestampTz last_recv_timestamp; + bool ping_sent; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -282,6 +284,10 @@ WalReceiverMain(void) MemSet(&reply_message, 0, sizeof(reply_message)); MemSet(&feedback_message, 0, sizeof(feedback_message)); + /* Initialize the last recv timestamp */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + /* Loop until end-of-streaming or error */ for (;;) { @@ -316,15 +322,23 @@ WalReceiverMain(void) /* Wait a while for data to arrive */ if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { + /* Something was received from master, so reset timeout */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + /* Accept the received data, and process it */ XLogWalRcvProcessMsg(type, buf, len); /* Receive any more data we can without sleeping */ while (walrcv_receive(0, &type, &buf, &len)) + { + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; XLogWalRcvProcessMsg(type, buf, len); + } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(); + XLogWalRcvSendReply(false, false); /* * If we've written some records, flush them to disk and let the @@ -335,10 +349,48 @@ WalReceiverMain(void) else { /* - * We didn't receive anything new, but send a status update to the - * master anyway, to report any progress in applying WAL. + * We didn't receive anything new. If we haven't heard anything + * from the server for more than wal_receiver_timeout / 2, + * ping the server. Also, if it's been longer than + * wal_receiver_status_interval since the last update we sent, + * send a status update to the master anyway, to report any + * progress in applying WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from standby has reached the + * configured limit. */ - XLogWalRcvSendReply(); + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errmsg("terminating walreceiver due to timeout"))); + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout/2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + XLogWalRcvSendReply(requestReply, requestReply); XLogWalRcvSendHSFeedback(); } } @@ -460,6 +512,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage)); ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime); + + /* If the primary requested a reply, send one immediately */ + if (keepalive.replyRequested) + XLogWalRcvSendReply(true, false); break; } default: @@ -609,19 +665,25 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) - { - XLogWalRcvSendReply(); - XLogWalRcvSendHSFeedback(); - } + XLogWalRcvSendReply(false, false); } } /* - * Send reply message to primary, indicating our current XLOG positions and - * the current time. + * Send reply message to primary, indicating our current XLOG positions, oldest + * xmin and the current time. + * + * If 'force' is not set, the message is only sent if enough time has + * passed since last status update to reach wal_receiver_status_internal. + * If wal_receiver_status_interval is disabled altogether and 'force' is + * false, this is a no-op. + * + * If 'requestReply' is true, requests the server to reply immediately upon + * receiving this message. This is used for heartbearts, when approaching + * wal_receiver_timeout. */ static void -XLogWalRcvSendReply(void) +XLogWalRcvSendReply(bool force, bool requestReply) { char buf[sizeof(StandbyReplyMessage) + 1]; TimestampTz now; @@ -630,7 +692,7 @@ XLogWalRcvSendReply(void) * If the user doesn't want status to be reported to the master, be sure * to exit before doing anything at all. */ - if (wal_receiver_status_interval <= 0) + if (!force && wal_receiver_status_interval <= 0) return; /* Get current timestamp. */ @@ -645,7 +707,8 @@ XLogWalRcvSendReply(void) * this is only for reporting purposes and only on idle systems, that's * probably OK. */ - if (XLByteEQ(reply_message.write, LogstreamResult.Write) + if (!force + && XLByteEQ(reply_message.write, LogstreamResult.Write) && XLByteEQ(reply_message.flush, LogstreamResult.Flush) && !TimestampDifferenceExceeds(reply_message.sendTime, now, wal_receiver_status_interval * 1000)) @@ -656,6 +719,7 @@ XLogWalRcvSendReply(void) reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(NULL); reply_message.sendTime = now; + reply_message.replyRequested = requestReply; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", (uint32) (reply_message.write >> 32), (uint32) reply_message.write, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4a7515fb13..2af38f1cbe 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -82,7 +82,7 @@ static bool replication_started = false; /* Started streaming yet? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ -int replication_timeout = 60 * 1000; /* maximum time to send one +int wal_sender_timeout = 60 * 1000; /* maximum time to send one * WAL data message */ /* * State for WalSndWakeupRequest @@ -103,15 +103,20 @@ static uint32 sendOff = 0; */ static XLogRecPtr sentPtr = 0; +/* Buffer for processing reply messages. */ +static StringInfoData reply_message; /* - * Buffer for processing reply messages. + * Buffer for constructing outgoing messages. + * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes) */ -static StringInfoData reply_message; +static char *output_message; /* * Timestamp of the last receipt of the reply from the standby. */ static TimestampTz last_reply_timestamp; +/* Have we sent a heartbeat message asking for reply, since last reply? */ +static bool ping_sent = false; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; @@ -126,14 +131,14 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS); static void WalSndLoop(void) __attribute__((noreturn)); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); -static void XLogSend(char *msgbuf, bool *caughtup); +static void XLogSend(bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); -static void WalSndKeepalive(char *msgbuf); +static void WalSndKeepalive(bool requestReply); /* Initialize walsender process before entering the main command loop */ @@ -465,7 +470,10 @@ ProcessRepliesIfAny(void) * Save the last reply timestamp if we've received at least one reply. */ if (received) + { last_reply_timestamp = GetCurrentTimestamp(); + ping_sent = false; + } } /* @@ -527,6 +535,10 @@ ProcessStandbyReplyMessage(void) (uint32) (reply.flush >> 32), (uint32) reply.flush, (uint32) (reply.apply >> 32), (uint32) reply.apply); + /* Send a reply if the standby requested one. */ + if (reply.replyRequested) + WalSndKeepalive(false); + /* * Update shared state for this WalSender process based on reply data from * standby. @@ -620,7 +632,6 @@ ProcessStandbyHSFeedbackMessage(void) static void WalSndLoop(void) { - char *output_message; bool caughtup = false; /* @@ -638,6 +649,7 @@ WalSndLoop(void) /* Initialize the last reply timestamp */ last_reply_timestamp = GetCurrentTimestamp(); + ping_sent = false; /* Loop forever, unless we get an error */ for (;;) @@ -672,7 +684,7 @@ WalSndLoop(void) * caught up. */ if (!pq_is_send_pending()) - XLogSend(output_message, &caughtup); + XLogSend(&caughtup); else caughtup = false; @@ -708,7 +720,7 @@ WalSndLoop(void) if (walsender_ready_to_stop) { /* ... let's just be real sure we're caught up ... */ - XLogSend(output_message, &caughtup); + XLogSend(&caughtup); if (caughtup && !pq_is_send_pending()) { /* Inform the standby that XLOG streaming is done */ @@ -738,23 +750,34 @@ WalSndLoop(void) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - else if (MyWalSnd->sendKeepalive) + else if (wal_sender_timeout > 0 && !ping_sent) { - WalSndKeepalive(output_message); - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - break; + /* + * If half of wal_sender_timeout has lapsed without receiving + * any reply from standby, send a keep-alive message to standby + * requesting an immediate reply. + */ + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + if (GetCurrentTimestamp() >= timeout) + { + WalSndKeepalive(true); + ping_sent = true; + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + break; + } } /* Determine time until replication timeout */ - if (replication_timeout > 0) + if (wal_sender_timeout > 0) { timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - replication_timeout); - sleeptime = 1 + (replication_timeout / 10); + wal_sender_timeout); + sleeptime = 1 + (wal_sender_timeout / 10); } - /* Sleep until something happens or replication timeout */ + /* Sleep until something happens or we time out */ ImmediateInterruptOK = true; CHECK_FOR_INTERRUPTS(); WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, @@ -766,8 +789,7 @@ WalSndLoop(void) * possibility that the client replied just as we reached the * timeout ... he's supposed to reply *before* that. */ - if (replication_timeout > 0 && - GetCurrentTimestamp() >= timeout) + if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout) { /* * Since typically expiration of replication timeout means @@ -1016,15 +1038,11 @@ retry: * but not yet sent to the client, and buffer it in the libpq output * buffer. * - * msgbuf is a work area in which the output message is constructed. It's - * passed in just so we can avoid re-palloc'ing the buffer on each cycle. - * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. - * * If there is no unsent WAL remaining, *caughtup is set to true, otherwise * *caughtup is set to false. */ static void -XLogSend(char *msgbuf, bool *caughtup) +XLogSend(bool *caughtup) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; @@ -1107,13 +1125,13 @@ XLogSend(char *msgbuf, bool *caughtup) /* * OK to read and send the slice. */ - msgbuf[0] = 'w'; + output_message[0] = 'w'; /* * Read the log directly into the output buffer to avoid extra memcpy * calls. */ - XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); + XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); /* * We fill the message header last so that the send timestamp is taken as @@ -1123,9 +1141,9 @@ XLogSend(char *msgbuf, bool *caughtup) msghdr.walEnd = SendRqstPtr; msghdr.sendTime = GetCurrentTimestamp(); - memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); + memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader)); - pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); + pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes); sentPtr = endptr; @@ -1492,21 +1510,27 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) return (Datum) 0; } +/* + * This function is used to send keepalive message to standby. + * If requestReply is set, sets a flag in the message requesting the standby + * to send a message back to us, for heartbeat purposes. + */ static void -WalSndKeepalive(char *msgbuf) +WalSndKeepalive(bool requestReply) { PrimaryKeepaliveMessage keepalive_message; /* Construct a new message */ keepalive_message.walEnd = sentPtr; keepalive_message.sendTime = GetCurrentTimestamp(); + keepalive_message.replyRequested = requestReply; elog(DEBUG2, "sending replication keepalive"); /* Prepend with the message type and send it. */ - msgbuf[0] = 'k'; - memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); - pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1); + output_message[0] = 'k'; + memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); + pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1); } /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6b202e0425..745e7be68e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1595,6 +1595,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the maximum wait time to receive data from master."), + NULL, + GUC_UNIT_MS + }, + &wal_receiver_timeout, + 60 * 1000, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), @@ -2019,12 +2030,12 @@ static struct config_int ConfigureNamesInt[] = }, { - {"replication_timeout", PGC_SIGHUP, REPLICATION_SENDING, + {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), NULL, GUC_UNIT_MS }, - &replication_timeout, + &wal_sender_timeout, 60 * 1000, 0, INT_MAX, NULL, NULL, NULL }, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 10f3fb1b24..eeb9b82abf 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -210,7 +210,7 @@ #max_wal_senders = 0 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables -#replication_timeout = 60s # in milliseconds; 0 disables +#wal_sender_timeout = 60s # in milliseconds; 0 disables # - Master Server - @@ -237,6 +237,9 @@ # 0 disables #hot_standby_feedback = off # send info from standby to prevent # query conflicts +#wal_receiver_timeout = 60s # time that receiver waits for + # communication from master + # in milliseconds; 0 disables #------------------------------------------------------------------------------ diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index 0305fb7e59..396d006ea7 100644 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -27,6 +27,12 @@ typedef struct /* Sender's system clock at the time of transmission */ TimestampTz sendTime; + + /* + * If replyRequested is set, the client should reply immediately to this + * message, to avoid a timeout disconnect. + */ + bool replyRequested; } WalSndrMessage; @@ -80,6 +86,12 @@ typedef struct /* Sender's system clock at the time of transmission */ TimestampTz sendTime; + + /* + * If replyRequested is set, the server should reply immediately to this + * message, to avoid a timeout disconnect. + */ + bool replyRequested; } StandbyReplyMessage; /* diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 62b6d2d93f..4cf5a27e6d 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -17,7 +17,9 @@ #include "storage/spin.h" #include "pgtime.h" +/* user-settable parameters */ extern int wal_receiver_status_interval; +extern int wal_receiver_timeout; extern bool hot_standby_feedback; /* diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 78e8558299..df8e951478 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -24,7 +24,7 @@ extern bool wake_wal_senders; /* user-settable parameters */ extern int max_wal_senders; -extern int replication_timeout; +extern int wal_sender_timeout; extern void InitWalSender(void); extern void exec_replication_command(const char *query_string); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 45cd7444cd..66234cd8b5 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -37,7 +37,6 @@ typedef struct WalSnd XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be * reloaded? */ - bool sendKeepalive; /* do we send keepalives on this connection? */ /* * The xlog locations that have been written, flushed, and applied by -- 2.40.0