has already been recycled. On success, server responds with a
CopyBothResponse message, and then starts to stream WAL to the frontend.
WAL will continue to be streamed until the connection is broken;
- no further commands will be accepted.
+ no further commands will be accepted. If the WAL sender process is
+ terminated normally (during postmaster shutdown), it will send a
+ CommandComplete message before exiting. This might not happen during an
+ abnormal shutdown, of course.
</para>
<para>
WAL data is sent as a series of CopyData messages. (This allows
other information to be intermixed; in particular the server can send
an ErrorResponse message if it encounters a failure after beginning
- to stream.) The payload in each CopyData message follows this format:
+ to stream.) The payload of each CopyData message from server to the
+ client contains a message of one of the following formats:
</para>
<para>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The starting point of the WAL data in this message, given in
- XLogRecPtr format.
+ The starting point of the WAL data in this message.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The current end of WAL on the server, given in
- XLogRecPtr format.
+ The current end of WAL on the server.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The server's system clock at the time of transmission,
- given in TimestampTz format.
+ The server's system clock at the time of transmission, as
+ microseconds since midnight on 2000-01-01.
</para>
</listitem>
</varlistentry>
<para>
A section of the WAL data stream.
</para>
+ <para>
+ A single WAL record is never split across two XLogData messages.
+ When a WAL record crosses a WAL page boundary, and is therefore
+ already split using continuation records, it can be split at the page
+ boundary. In other words, the first main WAL record and its
+ continuation records can be sent in different XLogData messages.
+ </para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
- </variablelist>
- </para>
- <para>
- A single WAL record is never split across two CopyData messages.
- When a WAL record crosses a WAL page boundary, and is therefore
- already split using continuation records, it can be split at the page
- boundary. In other words, the first main WAL record and its
- continuation records can be sent in different CopyData messages.
- </para>
- <para>
- Note that all fields within the WAL data and the above-described header
- will be in the sending server's native format. Endianness, and the
- format for the timestamp, are unpredictable unless the receiver has
- verified that the sender's system identifier matches its own
- <filename>pg_control</> contents.
- </para>
- <para>
- If the WAL sender process is terminated normally (during postmaster
- shutdown), it will send a CommandComplete message before exiting.
- This might not happen during an abnormal shutdown, of course.
- </para>
-
- <para>
- The receiving process can send replies back to the sender at any time,
- using one of the following message formats (also in the payload of a
- CopyData message):
- </para>
-
- <para>
- <variablelist>
<varlistentry>
<term>
Primary keepalive message (B)
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The current end of WAL on the server, given in
- XLogRecPtr format.
+ The current end of WAL on the server.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The server's system clock at the time of transmission,
- given in TimestampTz format.
+ The server's system clock at the time of transmission, as
+ microseconds since midnight on 2000-01-01.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte1
+ </term>
+ <listitem>
+ <para>
+ 1 means that the client should reply to this message as soon as
+ possible, to avoid a timeout disconnect. 0 otherwise.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
+ <para>
+ The receiving process can send replies back to the sender at any time,
+ using one of the following message formats (also in the payload of a
+ CopyData message):
+ </para>
+
<para>
<variablelist>
<varlistentry>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
The location of the last WAL byte + 1 received and written to disk
- in the standby, in XLogRecPtr format.
+ in the standby.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
The location of the last WAL byte + 1 flushed to disk in
- the standby, in XLogRecPtr format.
+ the standby.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Int64
+ </term>
+ <listitem>
+ <para>
+ The location of the last WAL byte + 1 applied in the standby.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The location of the last WAL byte + 1 applied in the standby, in
- XLogRecPtr format.
+ The client's system clock at the time of transmission, as
+ microseconds since midnight on 2000-01-01.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Byte1
</term>
<listitem>
<para>
- The server's system clock at the time of transmission,
- given in TimestampTz format.
+ If 1, the client requests the server to reply to this message
+ immediately. This can be used to ping the server, to test if
+ the connection is still healthy.
</para>
</listitem>
</varlistentry>
</varlistentry>
<varlistentry>
<term>
- Byte8
+ Int64
</term>
<listitem>
<para>
- The server's system clock at the time of transmission,
- given in TimestampTz format.
+ The client's system clock at the time of transmission, as
+ microseconds since midnight on 2000-01-01.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte4
+ Int32
</term>
<listitem>
<para>
- The standby's current xmin.
+ The standby's current xmin. This may be 0, if the standby does not
+ support feedback, or is not yet in Hot Standby state.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
- Byte4
+ Int32
</term>
<listitem>
<para>
#include <unistd.h>
#include "access/xlog_internal.h"
+#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
-#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
-static StandbyReplyMessage reply_message;
-static StandbyHSFeedbackMessage feedback_message;
+static StringInfoData reply_message;
+static StringInfoData incoming_message;
/*
* About SIGTERM handling:
walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
- /* Initialize LogstreamResult, reply_message and feedback_message */
+ /* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
- MemSet(&reply_message, 0, sizeof(reply_message));
- MemSet(&feedback_message, 0, sizeof(feedback_message));
+ initStringInfo(&reply_message);
+ initStringInfo(&incoming_message);
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
static void
XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
+ int hdrlen;
+ XLogRecPtr dataStart;
+ XLogRecPtr walEnd;
+ TimestampTz sendTime;
+ bool replyRequested;
+
+ resetStringInfo(&incoming_message);
+
switch (type)
{
case 'w': /* WAL records */
{
- WalDataMessageHeader msghdr;
-
- if (len < sizeof(WalDataMessageHeader))
+ /* copy message to StringInfo */
+ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
+ if (len < hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
- /* memcpy is required here for alignment reasons */
- memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
-
- ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
-
- buf += sizeof(WalDataMessageHeader);
- len -= sizeof(WalDataMessageHeader);
- XLogWalRcvWrite(buf, len, msghdr.dataStart);
+ appendBinaryStringInfo(&incoming_message, buf, hdrlen);
+
+ /* read the fields */
+ dataStart = pq_getmsgint64(&incoming_message);
+ walEnd = pq_getmsgint64(&incoming_message);
+ sendTime = IntegerTimestampToTimestampTz(
+ pq_getmsgint64(&incoming_message));
+ ProcessWalSndrMessage(walEnd, sendTime);
+
+ buf += hdrlen;
+ len -= hdrlen;
+ XLogWalRcvWrite(buf, len, dataStart);
break;
}
case 'k': /* Keepalive */
{
- PrimaryKeepaliveMessage keepalive;
-
- if (len != sizeof(PrimaryKeepaliveMessage))
+ /* copy message to StringInfo */
+ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+ if (len != hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid keepalive message received from primary")));
- /* memcpy is required here for alignment reasons */
- memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+ appendBinaryStringInfo(&incoming_message, buf, hdrlen);
- ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+ /* read the fields */
+ walEnd = pq_getmsgint64(&incoming_message);
+ sendTime = IntegerTimestampToTimestampTz(
+ pq_getmsgint64(&incoming_message));
+ replyRequested = pq_getmsgbyte(&incoming_message);
+
+ ProcessWalSndrMessage(walEnd, sendTime);
/* If the primary requested a reply, send one immediately */
- if (keepalive.replyRequested)
+ if (replyRequested)
XLogWalRcvSendReply(true, false);
break;
}
static void
XLogWalRcvSendReply(bool force, bool requestReply)
{
- char buf[sizeof(StandbyReplyMessage) + 1];
+ static XLogRecPtr writePtr = 0;
+ static XLogRecPtr flushPtr = 0;
+ XLogRecPtr applyPtr;
+ static TimestampTz sendTime = 0;
TimestampTz now;
/*
* probably OK.
*/
if (!force
- && XLByteEQ(reply_message.write, LogstreamResult.Write)
- && XLByteEQ(reply_message.flush, LogstreamResult.Flush)
- && !TimestampDifferenceExceeds(reply_message.sendTime, now,
+ && XLByteEQ(writePtr, LogstreamResult.Write)
+ && XLByteEQ(flushPtr, LogstreamResult.Flush)
+ && !TimestampDifferenceExceeds(sendTime, now,
wal_receiver_status_interval * 1000))
return;
+ sendTime = now;
/* Construct a new message */
- reply_message.write = LogstreamResult.Write;
- reply_message.flush = LogstreamResult.Flush;
- reply_message.apply = GetXLogReplayRecPtr(NULL);
- reply_message.sendTime = now;
- reply_message.replyRequested = requestReply;
-
- elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
- (uint32) (reply_message.write >> 32), (uint32) reply_message.write,
- (uint32) (reply_message.flush >> 32), (uint32) reply_message.flush,
- (uint32) (reply_message.apply >> 32), (uint32) reply_message.apply);
-
- /* Prepend with the message type and send it. */
- buf[0] = 'r';
- memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
- walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+ writePtr = LogstreamResult.Write;
+ flushPtr = LogstreamResult.Flush;
+ applyPtr = GetXLogReplayRecPtr(NULL);
+
+ resetStringInfo(&reply_message);
+ pq_sendbyte(&reply_message, 'r');
+ pq_sendint64(&reply_message, writePtr);
+ pq_sendint64(&reply_message, flushPtr);
+ pq_sendint64(&reply_message, applyPtr);
+ pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendbyte(&reply_message, requestReply ? 1 : 0);
+
+ /* Send it */
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
+ (uint32) (writePtr >> 32), (uint32) writePtr,
+ (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ (uint32) (applyPtr >> 32), (uint32) applyPtr,
+ requestReply ? " (reply requested)" : "");
+
+ walrcv_send(reply_message.data, reply_message.len);
}
/*
static void
XLogWalRcvSendHSFeedback(void)
{
- char buf[sizeof(StandbyHSFeedbackMessage) + 1];
TimestampTz now;
TransactionId nextXid;
uint32 nextEpoch;
TransactionId xmin;
+ static TimestampTz sendTime = 0;
/*
* If the user doesn't want status to be reported to the master, be sure
/*
* Send feedback at most once per wal_receiver_status_interval.
*/
- if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
+ if (!TimestampDifferenceExceeds(sendTime, now,
wal_receiver_status_interval * 1000))
return;
+ sendTime = now;
/*
* If Hot Standby is not yet active there is nothing to send. Check this
if (nextXid < xmin)
nextEpoch--;
- /*
- * Always send feedback message.
- */
- feedback_message.sendTime = now;
- feedback_message.xmin = xmin;
- feedback_message.epoch = nextEpoch;
-
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
- feedback_message.xmin,
- feedback_message.epoch);
-
- /* Prepend with the message type and send it. */
- buf[0] = 'h';
- memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
- walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
+ xmin, nextEpoch);
+
+ /* Construct the the message and send it. */
+ resetStringInfo(&reply_message);
+ pq_sendbyte(&reply_message, 'h');
+ pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendint(&reply_message, xmin, 4);
+ pq_sendint(&reply_message, nextEpoch, 4);
+ walrcv_send(reply_message.data, reply_message.len);
}
/*
- * Keep track of important messages from primary.
+ * Update shared memory status upon receiving a message from primary.
+ *
+ * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
+ * message, reported by primary.
*/
static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
#include "replication/syncrep.h"
-#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+/*
+ * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
+ *
+ * We don't have a good idea of what a good value would be; there's some
+ * overhead per message in both walsender and walreceiver, but on the other
+ * hand sending large batches makes walsender less responsive to signals
+ * because signals are checked only between messages. 128kB (with
+ * default 8k blocks) seems like a reasonable guess for now.
+ */
+#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
/* Array of WalSnds in shared memory */
WalSndCtlData *WalSndCtl = NULL;
*/
static XLogRecPtr sentPtr = 0;
-/* Buffer for processing reply messages. */
+/* Buffers for constructing outgoing messages and processing reply messages. */
+static StringInfoData output_message;
static StringInfoData reply_message;
-/*
- * Buffer for constructing outgoing messages.
- * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes)
- */
-static char *output_message;
+static StringInfoData tmpbuf;
/*
* Timestamp of the last receipt of the reply from the standby.
static void
ProcessStandbyReplyMessage(void)
{
- StandbyReplyMessage reply;
-
- pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
-
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
- (uint32) (reply.write >> 32), (uint32) reply.write,
- (uint32) (reply.flush >> 32), (uint32) reply.flush,
- (uint32) (reply.apply >> 32), (uint32) reply.apply);
+ XLogRecPtr writePtr,
+ flushPtr,
+ applyPtr;
+ bool replyRequested;
+
+ /* the caller already consumed the msgtype byte */
+ writePtr = pq_getmsgint64(&reply_message);
+ flushPtr = pq_getmsgint64(&reply_message);
+ applyPtr = pq_getmsgint64(&reply_message);
+ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyRequested = pq_getmsgbyte(&reply_message);
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
+ (uint32) (writePtr >> 32), (uint32) writePtr,
+ (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ (uint32) (applyPtr >> 32), (uint32) applyPtr,
+ replyRequested ? " (reply requested)" : "");
/* Send a reply if the standby requested one. */
- if (reply.replyRequested)
+ if (replyRequested)
WalSndKeepalive(false);
/*
volatile WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
- walsnd->write = reply.write;
- walsnd->flush = reply.flush;
- walsnd->apply = reply.apply;
+ walsnd->write = writePtr;
+ walsnd->flush = flushPtr;
+ walsnd->apply = applyPtr;
SpinLockRelease(&walsnd->mutex);
}
static void
ProcessStandbyHSFeedbackMessage(void)
{
- StandbyHSFeedbackMessage reply;
TransactionId nextXid;
uint32 nextEpoch;
+ TransactionId feedbackXmin;
+ uint32 feedbackEpoch;
- /* Decipher the reply message */
- pq_copymsgbytes(&reply_message, (char *) &reply,
- sizeof(StandbyHSFeedbackMessage));
+ /*
+ * Decipher the reply message. The caller already consumed the msgtype
+ * byte.
+ */
+ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ feedbackXmin = pq_getmsgint(&reply_message, 4);
+ feedbackEpoch = pq_getmsgint(&reply_message, 4);
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
- reply.xmin,
- reply.epoch);
+ feedbackXmin,
+ feedbackEpoch);
/* Ignore invalid xmin (can't actually happen with current walreceiver) */
- if (!TransactionIdIsNormal(reply.xmin))
+ if (!TransactionIdIsNormal(feedbackXmin))
return;
/*
*/
GetNextXidAndEpoch(&nextXid, &nextEpoch);
- if (reply.xmin <= nextXid)
+ if (feedbackXmin <= nextXid)
{
- if (reply.epoch != nextEpoch)
+ if (feedbackEpoch != nextEpoch)
return;
}
else
{
- if (reply.epoch + 1 != nextEpoch)
+ if (feedbackEpoch + 1 != nextEpoch)
return;
}
- if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+ if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
return; /* epoch OK, but it's wrapped around */
/*
* cleanup conflicts on the standby server.
*
* There is a small window for a race condition here: although we just
- * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+ * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
* advanced between our fetching it and applying the xmin below, perhaps
- * far enough to make reply.xmin wrap around. In that case the xmin we
+ * far enough to make feedbackXmin wrap around. In that case the xmin we
* set here would be "in the future" and have no effect. No point in
* worrying about this since it's too late to save the desired data
* anyway. Assuming that the standby sends us an increasing sequence of
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- MyPgXact->xmin = reply.xmin;
+ MyPgXact->xmin = feedbackXmin;
}
/* Main loop of walsender process that streams the WAL over Copy messages. */
bool caughtup = false;
/*
- * Allocate buffer that will be used for each output message. We do this
- * just once to reduce palloc overhead. The buffer must be made large
- * enough for maximum-sized messages.
- */
- output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
-
- /*
- * Allocate buffer that will be used for processing reply messages. As
- * above, do this just once to reduce palloc overhead.
+ * Allocate buffers that will be used for each outgoing and incoming
+ * message. We do this just once to reduce palloc overhead.
*/
+ initStringInfo(&output_message);
initStringInfo(&reply_message);
+ initStringInfo(&tmpbuf);
/* Initialize the last reply timestamp */
last_reply_timestamp = GetCurrentTimestamp();
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
- WalDataMessageHeader msghdr;
/*
* Attempt to send all data that's already been written out and fsync'd to
/*
* OK to read and send the slice.
*/
- output_message[0] = 'w';
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 'w');
+
+ pq_sendint64(&output_message, startptr); /* dataStart */
+ pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
+ pq_sendint64(&output_message, 0); /* sendtime, filled in last */
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
- XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+ enlargeStringInfo(&output_message, nbytes);
+ XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ output_message.len += nbytes;
+ output_message.data[output_message.len] = '\0';
/*
- * We fill the message header last so that the send timestamp is taken as
- * late as possible.
+ * Fill the send timestamp last, so that it is taken as late as possible.
*/
- msghdr.dataStart = startptr;
- msghdr.walEnd = SendRqstPtr;
- msghdr.sendTime = GetCurrentTimestamp();
+ resetStringInfo(&tmpbuf);
+ pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+ memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
+ tmpbuf.data, sizeof(int64));
- memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader));
-
- pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes);
+ pq_putmessage_noblock('d', output_message.data, output_message.len);
sentPtr = endptr;
static void
WalSndKeepalive(bool requestReply)
{
- PrimaryKeepaliveMessage keepalive_message;
-
- /* Construct a new message */
- keepalive_message.walEnd = sentPtr;
- keepalive_message.sendTime = GetCurrentTimestamp();
- keepalive_message.replyRequested = requestReply;
-
elog(DEBUG2, "sending replication keepalive");
- /* Prepend with the message type and send it. */
- output_message[0] = 'k';
- memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
- pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1);
+ /* construct the message... */
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 'k');
+ pq_sendint64(&output_message, sentPtr);
+ pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
+ pq_sendbyte(&output_message, requestReply ? 1 : 0);
+
+ /* ... and send it wrapped in CopyData */
+ pq_putmessage_noblock('d', output_message.data, output_message.len);
}
/*
return result;
}
+/*
+ * GetCurrentIntegerTimestamp -- get the current operating system time as int64
+ *
+ * Result is the number of milliseconds since the Postgres epoch. If compiled
+ * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(),
+ * and is implemented as a macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+GetCurrentIntegerTimestamp(void)
+{
+ int64 result;
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+
+ result = (int64) tp.tv_sec -
+ ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+
+ return result;
+}
+#endif
+
+/*
+ * IntegetTimestampToTimestampTz -- convert an int64 timestamp to native format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+TimestampTz
+IntegerTimestampToTimestampTz(int64 timestamp)
+{
+ TimestampTz result;
+
+ result = timestamp / USECS_PER_SEC;
+ result += (timestamp % USECS_PER_SEC) / 1000000.0;
+
+ return result;
+}
+#endif
+
/*
* TimestampDifference -- convert the difference between two timestamps
* into integer seconds and microseconds
#include "postgres.h"
#include "libpq-fe.h"
#include "access/xlog_internal.h"
-#include "replication/walprotocol.h"
#include "utils/datetime.h"
#include "utils/timestamp.h"
#include <unistd.h>
-/* Size of the streaming replication protocol headers */
-#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
-#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
-
/* fd for currently open WAL file */
static int walfile = -1;
-
/*
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
/*
* Local version of GetCurrentTimestamp(), since we are not linked with
- * backend code.
+ * backend code. The protocol always uses integer timestamps, regardless of
+ * server setting.
*/
-static TimestampTz
+static int64
localGetCurrentTimestamp(void)
{
- TimestampTz result;
+ int64 result;
struct timeval tp;
gettimeofday(&tp, NULL);
- result = (TimestampTz) tp.tv_sec -
+ result = (int64) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-#ifdef HAVE_INT64_TIMESTAMP
result = (result * USECS_PER_SEC) + tp.tv_usec;
-#else
- result = result + (tp.tv_usec / 1000000.0);
-#endif
return result;
}
/*
- * Local version of TimestampDifference(), since we are not
- * linked with backend code.
+ * Local version of TimestampDifference(), since we are not linked with
+ * backend code.
*/
static void
-localTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
+localTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs)
{
- TimestampTz diff = stop_time - start_time;
+ int64 diff = stop_time - start_time;
if (diff <= 0)
{
}
else
{
-#ifdef HAVE_INT64_TIMESTAMP
*secs = (long) (diff / USECS_PER_SEC);
*microsecs = (int) (diff % USECS_PER_SEC);
-#else
- *secs = (long) diff;
- *microsecs = (int) ((diff - *secs) * 1000000.0);
-#endif
}
}
* linked with backend code.
*/
static bool
-localTimestampDifferenceExceeds(TimestampTz start_time,
- TimestampTz stop_time,
+localTimestampDifferenceExceeds(int64 start_time,
+ int64 stop_time,
int msec)
{
- TimestampTz diff = stop_time - start_time;
+ int64 diff = stop_time - start_time;
-#ifdef HAVE_INT64_TIMESTAMP
return (diff >= msec * INT64CONST(1000));
-#else
- return (diff * 1000.0 >= msec);
-#endif
+}
+
+/*
+ * Converts an int64 to network byte order.
+ */
+static void
+sendint64(int64 i, char *buf)
+{
+ uint32 n32;
+
+ /* High order half first, since we're doing MSB-first */
+ n32 = (uint32) (i >> 32);
+ n32 = htonl(n32);
+ memcpy(&buf[0], &n32, 4);
+
+ /* Now the low order half */
+ n32 = (uint32) i;
+ n32 = htonl(n32);
+ memcpy(&buf[4], &n32, 4);
+}
+
+/*
+ * Converts an int64 from network byte order to native format.
+ */
+static int64
+recvint64(char *buf)
+{
+ int64 result;
+ uint32 h32;
+ uint32 l32;
+
+ memcpy(&h32, buf, 4);
+ memcpy(&l32, buf + 4, 4);
+ h32 = ntohl(h32);
+ l32 = ntohl(l32);
+
+ result = h32;
+ result <<= 32;
+ result |= l32;
+
+ return result;
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ */
+static bool
+sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
+{
+ char replybuf[1 + 8 + 8 + 8 + 8 + 1];
+ int len = 0;
+
+ replybuf[len] = 'r';
+ len += 1;
+ sendint64(blockpos, &replybuf[len]); /* write */
+ len += 8;
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
+ len += 8;
+ sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
+ len += 8;
+ sendint64(now, &replybuf[len]); /* sendTime */
+ len += 8;
+ replybuf[len] = 0; /* replyRequested */
+ len += 1;
+
+ if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send feedback packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+
+ return true;
}
/*
standby_message_timeout))
{
/* Time to send feedback! */
- char replybuf[sizeof(StandbyReplyMessage) + 1];
- StandbyReplyMessage *replymsg;
-
- replymsg = (StandbyReplyMessage *) (replybuf + 1);
- replymsg->write = blockpos;
- replymsg->flush = InvalidXLogRecPtr;
- replymsg->apply = InvalidXLogRecPtr;
- replymsg->sendTime = now;
- replybuf[0] = 'r';
-
- if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
- PQflush(conn))
- {
- fprintf(stderr, _("%s: could not send feedback packet: %s"),
- progname, PQerrorMessage(conn));
+ if (!sendFeedback(conn, blockpos, now))
goto error;
- }
-
last_status = now;
}
FD_SET(PQsocket(conn), &input_mask);
if (standby_message_timeout)
{
- TimestampTz targettime;
+ int64 targettime;
long secs;
int usecs;
- targettime = TimestampTzPlusMilliseconds(last_status,
- standby_message_timeout - 1);
+ targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
localTimestampDifference(now,
targettime,
&secs,
progname, PQerrorMessage(conn));
goto error;
}
+
+ /* Check the message type. */
if (copybuf[0] == 'k')
{
/*
* keepalive message, sent in 9.2 and newer. We just ignore this
* message completely, but need to skip past it in the stream.
*/
- if (r != STREAMING_KEEPALIVE_SIZE)
- {
- fprintf(stderr,
- _("%s: keepalive message has incorrect size %d\n"),
- progname, r);
- goto error;
- }
continue;
}
else if (copybuf[0] != 'w')
progname, copybuf[0]);
goto error;
}
+
+ /*
+ * Read the header of the XLogData message, enclosed in the CopyData
+ * message. We only need the WAL location field (dataStart), the rest
+ * of the header is ignored.
+ */
+#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */)
if (r < STREAMING_HEADER_SIZE + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
+ blockpos = recvint64(©buf[1]);
/* Extract WAL location for this block */
- memcpy(&blockpos, copybuf + 1, 8);
xlogoff = blockpos % XLOG_SEG_SIZE;
/*
+++ /dev/null
-/*-------------------------------------------------------------------------
- *
- * walprotocol.h
- * Definitions relevant to the streaming WAL transmission protocol.
- *
- * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
- *
- * src/include/replication/walprotocol.h
- *
- *-------------------------------------------------------------------------
- */
-#ifndef _WALPROTOCOL_H
-#define _WALPROTOCOL_H
-
-#include "access/xlogdefs.h"
-#include "datatype/timestamp.h"
-
-
-/*
- * All messages from WalSender must contain these fields to allow us to
- * correctly calculate the replication delay.
- */
-typedef struct
-{
- /* Current end of WAL on the sender */
- XLogRecPtr walEnd;
-
- /* Sender's system clock at the time of transmission */
- TimestampTz sendTime;
-
- /*
- * If replyRequested is set, the client should reply immediately to this
- * message, to avoid a timeout disconnect.
- */
- bool replyRequested;
-} WalSndrMessage;
-
-
-/*
- * Header for a WAL data message (message type 'w'). This is wrapped within
- * a CopyData message at the FE/BE protocol level.
- *
- * The header is followed by actual WAL data. Note that the data length is
- * not specified in the header --- it's just whatever remains in the message.
- *
- * walEnd and sendTime are not essential data, but are provided in case
- * the receiver wants to adjust its behavior depending on how far behind
- * it is.
- */
-typedef struct
-{
- /* WAL start location of the data included in this message */
- XLogRecPtr dataStart;
-
- /* Current end of WAL on the sender */
- XLogRecPtr walEnd;
-
- /* Sender's system clock at the time of transmission */
- TimestampTz sendTime;
-} WalDataMessageHeader;
-
-/*
- * Keepalive message from primary (message type 'k'). (lowercase k)
- * This is wrapped within a CopyData message at the FE/BE protocol level.
- *
- * Note that the data length is not specified here.
- */
-typedef WalSndrMessage PrimaryKeepaliveMessage;
-
-/*
- * Reply message from standby (message type 'r'). This is wrapped within
- * a CopyData message at the FE/BE protocol level.
- *
- * Note that the data length is not specified here.
- */
-typedef struct
-{
- /*
- * The xlog locations that have been written, flushed, and applied by
- * standby-side. These may be invalid if the standby-side is unable to or
- * chooses not to report these.
- */
- XLogRecPtr write;
- XLogRecPtr flush;
- XLogRecPtr apply;
-
- /* Sender's system clock at the time of transmission */
- TimestampTz sendTime;
-
- /*
- * If replyRequested is set, the server should reply immediately to this
- * message, to avoid a timeout disconnect.
- */
- bool replyRequested;
-} StandbyReplyMessage;
-
-/*
- * Hot Standby feedback from standby (message type 'h'). This is wrapped within
- * a CopyData message at the FE/BE protocol level.
- *
- * Note that the data length is not specified here.
- */
-typedef struct
-{
- /*
- * The current xmin and epoch from the standby, for Hot Standby feedback.
- * This may be invalid if the standby-side does not support feedback, or
- * Hot Standby is not yet available.
- */
- TransactionId xmin;
- uint32 epoch;
-
- /* Sender's system clock at the time of transmission */
- TimestampTz sendTime;
-} StandbyHSFeedbackMessage;
-
-/*
- * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
- *
- * We don't have a good idea of what a good value would be; there's some
- * overhead per message in both walsender and walreceiver, but on the other
- * hand sending large batches makes walsender less responsive to signals
- * because signals are checked only between messages. 128kB (with
- * default 8k blocks) seems like a reasonable guess for now.
- */
-#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
-
-#endif /* _WALPROTOCOL_H */
/* Internal routines (not fmgr-callable) */
extern TimestampTz GetCurrentTimestamp(void);
-
extern void TimestampDifference(TimestampTz start_time, TimestampTz stop_time,
long *secs, int *microsecs);
extern bool TimestampDifferenceExceeds(TimestampTz start_time,
TimestampTz stop_time,
int msec);
+/*
+ * Prototypes for functions to deal with integer timestamps, when the native
+ * format is float timestamps.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+extern int64 GetCurrentIntegerTimestamp(void);
+extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+#else
+#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
+#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#endif
+
extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
extern pg_time_t timestamptz_to_time_t(TimestampTz t);