From 64233902d22ba42846397cb7551894217522fad4 Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Sat, 31 Dec 2011 13:30:26 +0000 Subject: [PATCH] Send new protocol keepalive messages to standby servers. Allows streaming replication users to calculate transfer latency and apply delay via internal functions. No external functions yet. --- doc/src/sgml/protocol.sgml | 48 +++++++++++++++++ src/backend/access/transam/xlog.c | 43 +++++++++++++++ src/backend/replication/walreceiver.c | 47 +++++++++++++++- src/backend/replication/walreceiverfuncs.c | 63 ++++++++++++++++++++++ src/backend/replication/walsender.c | 42 +++++++++------ src/include/access/xlog.h | 1 + src/include/replication/walprotocol.h | 22 ++++++++ src/include/replication/walreceiver.h | 8 +++ 8 files changed, 258 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index d6332e58cf..71c40cc592 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are: CopyData message): + + + + + Primary keepalive message (B) + + + + + + + Byte1('k') + + + + Identifies the message as a sender keepalive. + + + + + + Byte8 + + + + The current end of WAL on the server, given in + XLogRecPtr format. + + + + + + Byte8 + + + + The server's system clock at the time of transmission, + given in TimestampTz format. + + + + + + + + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 41800a4604..d98a763fda 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -452,6 +452,9 @@ typedef struct XLogCtlData XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* timestamp of when we started replaying the current chunk of WAL data, + * only relevant for replication or archive recovery */ + TimestampTz currentChunkStartTime; /* end of the last record restored from the archive */ XLogRecPtr restoreLastRecPtr; /* Are we requested to pause recovery? */ @@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI, static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); static void recoveryPausesHere(void); static void SetLatestXTime(TimestampTz xtime); +static void SetCurrentChunkStartTime(TimestampTz xtime); static void CheckRequiredParameterValues(void); static void XLogReportParameters(void); static void LocalSetXLogInsertAllowed(void); @@ -5847,6 +5851,41 @@ GetLatestXTime(void) return xtime; } +/* + * Save timestamp of the next chunk of WAL records to apply. + * + * We keep this in XLogCtl, not a simple static variable, so that it can be + * seen by all backends. + */ +static void +SetCurrentChunkStartTime(TimestampTz xtime) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->currentChunkStartTime = xtime; + SpinLockRelease(&xlogctl->info_lck); +} + +/* + * Fetch timestamp of latest processed commit/abort record. + * Startup process maintains an accurate local copy in XLogReceiptTime + */ +TimestampTz +GetCurrentChunkReplayStartTime(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + TimestampTz xtime; + + SpinLockAcquire(&xlogctl->info_lck); + xtime = xlogctl->currentChunkStartTime; + SpinLockRelease(&xlogctl->info_lck); + + return xtime; +} + /* * Returns time of receipt of current chunk of XLOG data, as well as * whether it was received from streaming replication or from archives. @@ -6390,6 +6429,7 @@ StartupXLOG(void) xlogctl->replayEndRecPtr = ReadRecPtr; xlogctl->recoveryLastRecPtr = ReadRecPtr; xlogctl->recoveryLastXTime = 0; + xlogctl->currentChunkStartTime = 0; xlogctl->recoveryPause = false; SpinLockRelease(&xlogctl->info_lck); @@ -9696,7 +9736,10 @@ retry: { havedata = true; if (!XLByteLT(*RecPtr, latestChunkStart)) + { XLogReceiptTime = GetCurrentTimestamp(); + SetCurrentChunkStartTime(XLogReceiptTime); + } } else havedata = false; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 1f12dcb62a..8106d6b3a4 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(void); static void XLogWalRcvSendHSFeedback(void); +static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -218,6 +219,10 @@ WalReceiverMain(void) /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); startpoint = walrcv->receiveStart; + + /* Initialise to a sanish value */ + walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp(); + SpinLockRelease(&walrcv->mutex); /* Arrange to clean up at walreceiver exit */ @@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) errmsg_internal("invalid WAL message received from primary"))); /* memcpy is required here for alignment reasons */ memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); + + ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime); + buf += sizeof(WalDataMessageHeader); len -= sizeof(WalDataMessageHeader); - XLogWalRcvWrite(buf, len, msghdr.dataStart); break; } + case 'k': /* Keepalive */ + { + PrimaryKeepaliveMessage keepalive; + + if (len != sizeof(PrimaryKeepaliveMessage)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid keepalive message received from primary"))); + /* memcpy is required here for alignment reasons */ + memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage)); + + ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime); + break; + } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void) memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); } + +/* + * Keep track of important messages from primary. + */ +static void +ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); + + /* Update shared-memory status */ + SpinLockAcquire(&walrcv->mutex); + walrcv->lastMsgSendTime = sendTime; + walrcv->lastMsgReceiptTime = lastMsgReceiptTime; + SpinLockRelease(&walrcv->mutex); + + elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d", + timestamptz_to_str(sendTime), + timestamptz_to_str(lastMsgReceiptTime), + GetReplicationApplyDelay(), + GetReplicationTransferLatency()); +} diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 5bce1c34a1..054355b2c5 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -28,6 +28,7 @@ #include "replication/walreceiver.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/timestamp.h" WalRcvData *WalRcv = NULL; @@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) return recptr; } + +/* + * Returns the replication apply delay in ms + */ +int +GetReplicationApplyDelay(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + XLogRecPtr receivePtr; + XLogRecPtr replayPtr; + + long secs; + int usecs; + + SpinLockAcquire(&walrcv->mutex); + receivePtr = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + replayPtr = GetXLogReplayRecPtr(NULL); + + if (XLByteLE(receivePtr, replayPtr)) + return 0; + + TimestampDifference(GetCurrentChunkReplayStartTime(), + GetCurrentTimestamp(), + &secs, &usecs); + + return (((int) secs * 1000) + (usecs / 1000)); +} + +/* + * Returns the network latency in ms, note that this includes any + * difference in clock settings between the servers, as well as timezone. + */ +int +GetReplicationTransferLatency(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + TimestampTz lastMsgSendTime; + TimestampTz lastMsgReceiptTime; + + long secs = 0; + int usecs = 0; + int ms; + + SpinLockAcquire(&walrcv->mutex); + lastMsgSendTime = walrcv->lastMsgSendTime; + lastMsgReceiptTime = walrcv->lastMsgReceiptTime; + SpinLockRelease(&walrcv->mutex); + + TimestampDifference(lastMsgSendTime, + lastMsgReceiptTime, + &secs, &usecs); + + ms = ((int) secs * 1000) + (usecs / 1000); + + return ms; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ea86520417..ed7298b6ee 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); +static void WalSndKeepalive(char *msgbuf); /* Main entry point for walsender process */ @@ -823,30 +824,24 @@ WalSndLoop(void) */ if (caughtup || pq_is_send_pending()) { - TimestampTz finish_time = 0; - long sleeptime = -1; + TimestampTz timeout = 0; + long sleeptime = 10000; /* 10 s */ int wakeEvents; wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_READABLE; + WL_SOCKET_READABLE | WL_TIMEOUT; + if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; + else + WalSndKeepalive(output_message); /* Determine time until replication timeout */ if (replication_timeout > 0) { - long secs; - int usecs; - - finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, replication_timeout); - TimestampDifference(GetCurrentTimestamp(), - finish_time, &secs, &usecs); - sleeptime = secs * 1000 + usecs / 1000; - /* Avoid Assert in WaitLatchOrSocket if timeout is past */ - if (sleeptime < 0) - sleeptime = 0; - wakeEvents |= WL_TIMEOUT; + sleeptime = 1 + (replication_timeout / 10); } /* Sleep until something happens or replication timeout */ @@ -859,7 +854,7 @@ WalSndLoop(void) * timeout ... he's supposed to reply *before* that. */ if (replication_timeout > 0 && - GetCurrentTimestamp() >= finish_time) + GetCurrentTimestamp() >= timeout) { /* * Since typically expiration of replication timeout means @@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) return (Datum) 0; } +static void +WalSndKeepalive(char *msgbuf) +{ + PrimaryKeepaliveMessage keepalive_message; + + /* Construct a new message */ + keepalive_message.walEnd = sentPtr; + keepalive_message.sendTime = GetCurrentTimestamp(); + + elog(DEBUG2, "sending replication keepalive"); + + /* Prepend with the message type and send it. */ + msgbuf[0] = 'k'; + memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); + pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1); +} + /* * This isn't currently used for anything. Monitoring tools might be * interested in the future, and we'll need something like this in the diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 86ab3276ca..4b1f8b8c2f 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void); extern bool RecoveryIsPaused(void); extern void SetRecoveryPause(bool recoveryPause); extern TimestampTz GetLatestXTime(void); +extern TimestampTz GetCurrentChunkReplayStartTime(void); extern void UpdateControlFile(void); extern uint64 GetSystemIdentifier(void); diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index 656c8fc17f..053376d377 100644 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -16,6 +16,20 @@ #include "datatype/timestamp.h" +/* + * All messages from WalSender must contain these fields to allow us to + * correctly calculate the replication delay. + */ +typedef struct +{ + /* Current end of WAL on the sender */ + XLogRecPtr walEnd; + + /* Sender's system clock at the time of transmission */ + TimestampTz sendTime; +} WalSndrMessage; + + /* * Header for a WAL data message (message type 'w'). This is wrapped within * a CopyData message at the FE/BE protocol level. @@ -39,6 +53,14 @@ typedef struct TimestampTz sendTime; } WalDataMessageHeader; +/* + * Keepalive message from primary (message type 'k'). (lowercase k) + * This is wrapped within a CopyData message at the FE/BE protocol level. + * + * Note that the data length is not specified here. + */ +typedef WalSndrMessage PrimaryKeepaliveMessage; + /* * Reply message from standby (message type 'r'). This is wrapped within * a CopyData message at the FE/BE protocol level. diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 77f5252091..926730c9f8 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -78,6 +78,12 @@ typedef struct */ XLogRecPtr latestChunkStart; + /* + * Time of send and receive of any message received. + */ + TimestampTz lastMsgSendTime; + TimestampTz lastMsgReceiptTime; + /* * connection string; is used for walreceiver to connect with the primary. */ @@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); +extern int GetReplicationApplyDelay(void); +extern int GetReplicationTransferLatency(void); #endif /* _WALRECEIVER_H */ -- 2.40.0