CopyData message):
</para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ Primary keepalive message (B)
+ </term>
+ <listitem>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ Byte1('k')
+ </term>
+ <listitem>
+ <para>
+ Identifies the message as a sender keepalive.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte8
+ </term>
+ <listitem>
+ <para>
+ The current end of WAL on the server, given in
+ XLogRecPtr format.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte8
+ </term>
+ <listitem>
+ <para>
+ The server's system clock at the time of transmission,
+ given in TimestampTz format.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
<para>
<variablelist>
<varlistentry>
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* timestamp of when we started replaying the current chunk of WAL data,
+ * only relevant for replication or archive recovery */
+ TimestampTz currentChunkStartTime;
/* end of the last record restored from the archive */
XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static void recoveryPausesHere(void);
static void SetLatestXTime(TimestampTz xtime);
+static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void);
return xtime;
}
+/*
+ * Save timestamp of the next chunk of WAL records to apply.
+ *
+ * We keep this in XLogCtl, not a simple static variable, so that it can be
+ * seen by all backends.
+ */
+static void
+SetCurrentChunkStartTime(TimestampTz xtime)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->currentChunkStartTime = xtime;
+ SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Fetch timestamp of latest processed commit/abort record.
+ * Startup process maintains an accurate local copy in XLogReceiptTime
+ */
+TimestampTz
+GetCurrentChunkReplayStartTime(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ TimestampTz xtime;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xtime = xlogctl->currentChunkStartTime;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return xtime;
+}
+
/*
* Returns time of receipt of current chunk of XLOG data, as well as
* whether it was received from streaming replication or from archives.
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->recoveryLastRecPtr = ReadRecPtr;
xlogctl->recoveryLastXTime = 0;
+ xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
SpinLockRelease(&xlogctl->info_lck);
{
havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart))
+ {
XLogReceiptTime = GetCurrentTimestamp();
+ SetCurrentChunkStartTime(XLogReceiptTime);
+ }
}
else
havedata = false;
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
+
+ /* Initialise to a sanish value */
+ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
+
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+
+ ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
+
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
-
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
+ case 'k': /* Keepalive */
+ {
+ PrimaryKeepaliveMessage keepalive;
+
+ if (len != sizeof(PrimaryKeepaliveMessage))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid keepalive message received from primary")));
+ /* memcpy is required here for alignment reasons */
+ memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+
+ ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+ break;
+ }
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
}
+
+/*
+ * Keep track of important messages from primary.
+ */
+static void
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+
+ /* Update shared-memory status */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->lastMsgSendTime = sendTime;
+ walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
+ timestamptz_to_str(sendTime),
+ timestamptz_to_str(lastMsgReceiptTime),
+ GetReplicationApplyDelay(),
+ GetReplicationTransferLatency());
+}
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
+#include "utils/timestamp.h"
WalRcvData *WalRcv = NULL;
return recptr;
}
+
+/*
+ * Returns the replication apply delay in ms
+ */
+int
+GetReplicationApplyDelay(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+
+ long secs;
+ int usecs;
+
+ SpinLockAcquire(&walrcv->mutex);
+ receivePtr = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ replayPtr = GetXLogReplayRecPtr(NULL);
+
+ if (XLByteLE(receivePtr, replayPtr))
+ return 0;
+
+ TimestampDifference(GetCurrentChunkReplayStartTime(),
+ GetCurrentTimestamp(),
+ &secs, &usecs);
+
+ return (((int) secs * 1000) + (usecs / 1000));
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
+ long secs = 0;
+ int usecs = 0;
+ int ms;
+
+ SpinLockAcquire(&walrcv->mutex);
+ lastMsgSendTime = walrcv->lastMsgSendTime;
+ lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ TimestampDifference(lastMsgSendTime,
+ lastMsgReceiptTime,
+ &secs, &usecs);
+
+ ms = ((int) secs * 1000) + (usecs / 1000);
+
+ return ms;
+}
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
*/
if (caughtup || pq_is_send_pending())
{
- TimestampTz finish_time = 0;
- long sleeptime = -1;
+ TimestampTz timeout = 0;
+ long sleeptime = 10000; /* 10 s */
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_READABLE;
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ else
+ WalSndKeepalive(output_message);
/* Determine time until replication timeout */
if (replication_timeout > 0)
{
- long secs;
- int usecs;
-
- finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout);
- TimestampDifference(GetCurrentTimestamp(),
- finish_time, &secs, &usecs);
- sleeptime = secs * 1000 + usecs / 1000;
- /* Avoid Assert in WaitLatchOrSocket if timeout is past */
- if (sleeptime < 0)
- sleeptime = 0;
- wakeEvents |= WL_TIMEOUT;
+ sleeptime = 1 + (replication_timeout / 10);
}
/* Sleep until something happens or replication timeout */
* timeout ... he's supposed to reply *before* that.
*/
if (replication_timeout > 0 &&
- GetCurrentTimestamp() >= finish_time)
+ GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
return (Datum) 0;
}
+static void
+WalSndKeepalive(char *msgbuf)
+{
+ PrimaryKeepaliveMessage keepalive_message;
+
+ /* Construct a new message */
+ keepalive_message.walEnd = sentPtr;
+ keepalive_message.sendTime = GetCurrentTimestamp();
+
+ elog(DEBUG2, "sending replication keepalive");
+
+ /* Prepend with the message type and send it. */
+ msgbuf[0] = 'k';
+ memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
+extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
#include "datatype/timestamp.h"
+/*
+ * All messages from WalSender must contain these fields to allow us to
+ * correctly calculate the replication delay.
+ */
+typedef struct
+{
+ /* Current end of WAL on the sender */
+ XLogRecPtr walEnd;
+
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} WalSndrMessage;
+
+
/*
* Header for a WAL data message (message type 'w'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
TimestampTz sendTime;
} WalDataMessageHeader;
+/*
+ * Keepalive message from primary (message type 'k'). (lowercase k)
+ * This is wrapped within a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef WalSndrMessage PrimaryKeepaliveMessage;
+
/*
* Reply message from standby (message type 'r'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
*/
XLogRecPtr latestChunkStart;
+ /*
+ * Time of send and receive of any message received.
+ */
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
/*
* connection string; is used for walreceiver to connect with the primary.
*/
extern bool WalRcvInProgress(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
#endif /* _WALRECEIVER_H */