]> granicus.if.org Git - postgresql/commitdiff
Send new protocol keepalive messages to standby servers.
authorSimon Riggs <simon@2ndQuadrant.com>
Sat, 31 Dec 2011 13:30:26 +0000 (13:30 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Sat, 31 Dec 2011 13:30:26 +0000 (13:30 +0000)
Allows streaming replication users to calculate transfer latency
and apply delay via internal functions. No external functions yet.

doc/src/sgml/protocol.sgml
src/backend/access/transam/xlog.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/include/access/xlog.h
src/include/replication/walprotocol.h
src/include/replication/walreceiver.h

index d6332e58cf79f981bf7396df782686fc0b0738d7..71c40cc592e2b0cd5075075886c2d948af254f56 100644 (file)
@@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are:
        CopyData message):
      </para>
 
+     <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Primary keepalive message (B)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('k')
+      </term>
+      <listitem>
+      <para>
+          Identifies the message as a sender keepalive.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The current end of WAL on the server, given in
+          XLogRecPtr format.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte8
+      </term>
+      <listitem>
+      <para>
+          The server's system clock at the time of transmission,
+          given in TimestampTz format.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+     </para>
+
      <para>
       <variablelist>
       <varlistentry>
index 41800a46040b56f911a950ef87df293a11ae1560..d98a763fda67606e16bd01a91ab85701598acd7e 100644 (file)
@@ -452,6 +452,9 @@ typedef struct XLogCtlData
        XLogRecPtr      recoveryLastRecPtr;
        /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
        TimestampTz recoveryLastXTime;
+       /* timestamp of when we started replaying the current chunk of WAL data,
+        * only relevant for replication or archive recovery */
+       TimestampTz currentChunkStartTime;
        /* end of the last record restored from the archive */
        XLogRecPtr      restoreLastRecPtr;
        /* Are we requested to pause recovery? */
@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
 static void recoveryPausesHere(void);
 static void SetLatestXTime(TimestampTz xtime);
+static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
 static void LocalSetXLogInsertAllowed(void);
@@ -5847,6 +5851,41 @@ GetLatestXTime(void)
        return xtime;
 }
 
+/*
+ * Save timestamp of the next chunk of WAL records to apply.
+ *
+ * We keep this in XLogCtl, not a simple static variable, so that it can be
+ * seen by all backends.
+ */
+static void
+SetCurrentChunkStartTime(TimestampTz xtime)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->currentChunkStartTime = xtime;
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Fetch timestamp of latest processed commit/abort record.
+ * Startup process maintains an accurate local copy in XLogReceiptTime
+ */
+TimestampTz
+GetCurrentChunkReplayStartTime(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       TimestampTz xtime;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       xtime = xlogctl->currentChunkStartTime;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return xtime;
+}
+
 /*
  * Returns time of receipt of current chunk of XLOG data, as well as
  * whether it was received from streaming replication or from archives.
@@ -6390,6 +6429,7 @@ StartupXLOG(void)
                xlogctl->replayEndRecPtr = ReadRecPtr;
                xlogctl->recoveryLastRecPtr = ReadRecPtr;
                xlogctl->recoveryLastXTime = 0;
+               xlogctl->currentChunkStartTime = 0;
                xlogctl->recoveryPause = false;
                SpinLockRelease(&xlogctl->info_lck);
 
@@ -9696,7 +9736,10 @@ retry:
                                                {
                                                        havedata = true;
                                                        if (!XLByteLT(*RecPtr, latestChunkStart))
+                                                       {
                                                                XLogReceiptTime = GetCurrentTimestamp();
+                                                               SetCurrentChunkStartTime(XLogReceiptTime);
+                                                       }
                                                }
                                                else
                                                        havedata = false;
index 1f12dcb62aa0fba4dfd6f859caa1163e5f47ed4e..8106d6b3a41ea680fa7e76e0c135753adc75b686 100644 (file)
@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(void);
 static void XLogWalRcvSendHSFeedback(void);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -218,6 +219,10 @@ WalReceiverMain(void)
        /* Fetch information required to start streaming */
        strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
        startpoint = walrcv->receiveStart;
+
+       /* Initialise to a sanish value */
+       walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
+
        SpinLockRelease(&walrcv->mutex);
 
        /* Arrange to clean up at walreceiver exit */
@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
                                                         errmsg_internal("invalid WAL message received from primary")));
                                /* memcpy is required here for alignment reasons */
                                memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+
+                               ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
+
                                buf += sizeof(WalDataMessageHeader);
                                len -= sizeof(WalDataMessageHeader);
-
                                XLogWalRcvWrite(buf, len, msghdr.dataStart);
                                break;
                        }
+               case 'k':                               /* Keepalive */
+                       {
+                               PrimaryKeepaliveMessage keepalive;
+
+                               if (len != sizeof(PrimaryKeepaliveMessage))
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                        errmsg_internal("invalid keepalive message received from primary")));
+                               /* memcpy is required here for alignment reasons */
+                               memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+
+                               ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+                               break;
+                       }
                default:
                        ereport(ERROR,
                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
        memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
        walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
 }
+
+/*
+ * Keep track of important messages from primary.
+ */
+static void
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+
+       TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+
+       /* Update shared-memory status */
+       SpinLockAcquire(&walrcv->mutex);
+       walrcv->lastMsgSendTime = sendTime;
+       walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+       SpinLockRelease(&walrcv->mutex);
+
+       elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
+                                       timestamptz_to_str(sendTime),
+                                       timestamptz_to_str(lastMsgReceiptTime),
+                                       GetReplicationApplyDelay(),
+                                       GetReplicationTransferLatency());
+}
index 5bce1c34a1b5f06670b83efce9636fe49d3eff75..054355b2c59f0b9ec895030aa12e199c9e81e866 100644 (file)
@@ -28,6 +28,7 @@
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/timestamp.h"
 
 WalRcvData *WalRcv = NULL;
 
@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
 
        return recptr;
 }
+
+/*
+ * Returns the replication apply delay in ms
+ */
+int
+GetReplicationApplyDelay(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+
+       XLogRecPtr      receivePtr;
+       XLogRecPtr      replayPtr;
+
+       long    secs;
+       int             usecs;
+
+       SpinLockAcquire(&walrcv->mutex);
+       receivePtr = walrcv->receivedUpto;
+       SpinLockRelease(&walrcv->mutex);
+
+       replayPtr = GetXLogReplayRecPtr(NULL);
+
+       if (XLByteLE(receivePtr, replayPtr))
+               return 0;
+
+       TimestampDifference(GetCurrentChunkReplayStartTime(),
+                                               GetCurrentTimestamp(),
+                                               &secs, &usecs);
+
+       return (((int) secs * 1000) + (usecs / 1000));
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+
+       TimestampTz lastMsgSendTime;
+       TimestampTz lastMsgReceiptTime;
+
+       long    secs = 0;
+       int             usecs = 0;
+       int             ms;
+
+       SpinLockAcquire(&walrcv->mutex);
+       lastMsgSendTime = walrcv->lastMsgSendTime;
+       lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+       SpinLockRelease(&walrcv->mutex);
+
+       TimestampDifference(lastMsgSendTime,
+                                               lastMsgReceiptTime,
+                                               &secs, &usecs);
+
+       ms = ((int) secs * 1000) + (usecs / 1000);
+
+       return ms;
+}
index ea865204172263fe0dc143c52c994a0bde562845..ed7298b6ee8fc5580630466096e24ee16cafd9b2 100644 (file)
@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
 
 
 /* Main entry point for walsender process */
@@ -823,30 +824,24 @@ WalSndLoop(void)
                 */
                if (caughtup || pq_is_send_pending())
                {
-                       TimestampTz finish_time = 0;
-                       long            sleeptime = -1;
+                       TimestampTz timeout = 0;
+                       long            sleeptime = 10000; /* 10 s */
                        int                     wakeEvents;
 
                        wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
-                               WL_SOCKET_READABLE;
+                               WL_SOCKET_READABLE | WL_TIMEOUT;
+
                        if (pq_is_send_pending())
                                wakeEvents |= WL_SOCKET_WRITEABLE;
+                       else
+                               WalSndKeepalive(output_message);
 
                        /* Determine time until replication timeout */
                        if (replication_timeout > 0)
                        {
-                               long            secs;
-                               int                     usecs;
-
-                               finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+                               timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
                                                                                                                  replication_timeout);
-                               TimestampDifference(GetCurrentTimestamp(),
-                                                                       finish_time, &secs, &usecs);
-                               sleeptime = secs * 1000 + usecs / 1000;
-                               /* Avoid Assert in WaitLatchOrSocket if timeout is past */
-                               if (sleeptime < 0)
-                                       sleeptime = 0;
-                               wakeEvents |= WL_TIMEOUT;
+                               sleeptime = 1 + (replication_timeout / 10);
                        }
 
                        /* Sleep until something happens or replication timeout */
@@ -859,7 +854,7 @@ WalSndLoop(void)
                         * timeout ... he's supposed to reply *before* that.
                         */
                        if (replication_timeout > 0 &&
-                               GetCurrentTimestamp() >= finish_time)
+                               GetCurrentTimestamp() >= timeout)
                        {
                                /*
                                 * Since typically expiration of replication timeout means
@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        return (Datum) 0;
 }
 
+static void
+WalSndKeepalive(char *msgbuf)
+{
+       PrimaryKeepaliveMessage keepalive_message;
+
+       /* Construct a new message */
+       keepalive_message.walEnd = sentPtr;
+       keepalive_message.sendTime = GetCurrentTimestamp();
+
+       elog(DEBUG2, "sending replication keepalive");
+
+       /* Prepend with the message type and send it. */
+       msgbuf[0] = 'k';
+       memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+       pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
index 86ab3276caf125e79b380173c7fff840e98a67cf..4b1f8b8c2f34cdd6990c72f3e75246424ae2939a 100644 (file)
@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);
 extern void SetRecoveryPause(bool recoveryPause);
 extern TimestampTz GetLatestXTime(void);
+extern TimestampTz GetCurrentChunkReplayStartTime(void);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
index 656c8fc17fdf5973d4869657d38958b0724c9c3e..053376d3774d945385113c4e7c1515fff6e71186 100644 (file)
 #include "datatype/timestamp.h"
 
 
+/*
+ * All messages from WalSender must contain these fields to allow us to
+ * correctly calculate the replication delay.
+ */
+typedef struct
+{
+       /* Current end of WAL on the sender */
+       XLogRecPtr      walEnd;
+
+       /* Sender's system clock at the time of transmission */
+       TimestampTz sendTime;
+} WalSndrMessage;
+
+
 /*
  * Header for a WAL data message (message type 'w').  This is wrapped within
  * a CopyData message at the FE/BE protocol level.
@@ -39,6 +53,14 @@ typedef struct
        TimestampTz sendTime;
 } WalDataMessageHeader;
 
+/*
+ * Keepalive message from primary (message type 'k'). (lowercase k)
+ * This is wrapped within a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef WalSndrMessage PrimaryKeepaliveMessage;
+
 /*
  * Reply message from standby (message type 'r').  This is wrapped within
  * a CopyData message at the FE/BE protocol level.
index 77f525209170cbeeb0eaa10f77cc8f18a6fff334..926730c9f823a6917e0943ae5279a3452d1d15fb 100644 (file)
@@ -78,6 +78,12 @@ typedef struct
         */
        XLogRecPtr      latestChunkStart;
 
+       /*
+        * Time of send and receive of any message received.
+        */
+       TimestampTz lastMsgSendTime;
+       TimestampTz lastMsgReceiptTime;
+
        /*
         * connection string; is used for walreceiver to connect with the primary.
         */
@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
 
 #endif   /* _WALRECEIVER_H */