]> granicus.if.org Git - postgresql/commitdiff
Make the streaming replication protocol messages architecture-independent.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 7 Nov 2012 16:59:12 +0000 (18:59 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 7 Nov 2012 17:09:13 +0000 (19:09 +0200)
We used to send structs wrapped in CopyData messages, which works as long as
the client and server agree on things like endianess, timestamp format and
alignment. That's good enough for running a standby server, which has to run
on the same platform anyway, but it's useful for tools like pg_receivexlog
to work across platforms.

This breaks protocol compatibility of streaming replication, but we never
promised that to be compatible across versions, anyway.

doc/src/sgml/protocol.sgml
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/backend/utils/adt/timestamp.c
src/bin/pg_basebackup/receivelog.c
src/include/replication/walprotocol.h [deleted file]
src/include/utils/timestamp.h

index 3d72a162ebf9c28396796cbc69011d2a33e300d3..f87020c9099b92e697099a2db2d26aeab38fa92c 100644 (file)
@@ -1359,14 +1359,18 @@ The commands accepted in walsender mode are:
       has already been recycled. On success, server responds with a
       CopyBothResponse message, and then starts to stream WAL to the frontend.
       WAL will continue to be streamed until the connection is broken;
-      no further commands will be accepted.
+      no further commands will be accepted. If the WAL sender process is
+      terminated normally (during postmaster shutdown), it will send a
+      CommandComplete message before exiting. This might not happen during an
+      abnormal shutdown, of course.
      </para>
 
      <para>
       WAL data is sent as a series of CopyData messages.  (This allows
       other information to be intermixed; in particular the server can send
       an ErrorResponse message if it encounters a failure after beginning
-      to stream.)  The payload in each CopyData message follows this format:
+      to stream.)  The payload of each CopyData message from server to the
+      client contains a message of one of the following formats:
      </para>
 
      <para>
@@ -1390,34 +1394,32 @@ The commands accepted in walsender mode are:
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The starting point of the WAL data in this message, given in
-          XLogRecPtr format.
+          The starting point of the WAL data in this message.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The current end of WAL on the server, given in
-          XLogRecPtr format.
+          The current end of WAL on the server.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The server's system clock at the time of transmission,
-          given in TimestampTz format.
+          The server's system clock at the time of transmission, as
+          microseconds since midnight on 2000-01-01.
       </para>
       </listitem>
       </varlistentry>
@@ -1429,42 +1431,19 @@ The commands accepted in walsender mode are:
       <para>
           A section of the WAL data stream.
       </para>
+      <para>
+          A single WAL record is never split across two XLogData messages.
+          When a WAL record crosses a WAL page boundary, and is therefore
+          already split using continuation records, it can be split at the page
+          boundary. In other words, the first main WAL record and its
+          continuation records can be sent in different XLogData messages.
+      </para>
       </listitem>
       </varlistentry>
       </variablelist>
       </para>
       </listitem>
       </varlistentry>
-      </variablelist>
-     </para>
-     <para>
-       A single WAL record is never split across two CopyData messages.
-       When a WAL record crosses a WAL page boundary, and is therefore
-       already split using continuation records, it can be split at the page
-       boundary. In other words, the first main WAL record and its
-       continuation records can be sent in different CopyData messages.
-     </para>
-     <para>
-       Note that all fields within the WAL data and the above-described header
-       will be in the sending server's native format.  Endianness, and the
-       format for the timestamp, are unpredictable unless the receiver has
-       verified that the sender's system identifier matches its own
-       <filename>pg_control</> contents.
-     </para>
-     <para>
-       If the WAL sender process is terminated normally (during postmaster
-       shutdown), it will send a CommandComplete message before exiting.
-       This might not happen during an abnormal shutdown, of course.
-     </para>
-
-     <para>
-       The receiving process can send replies back to the sender at any time,
-       using one of the following message formats (also in the payload of a
-       CopyData message):
-     </para>
-
-     <para>
-      <variablelist>
       <varlistentry>
       <term>
           Primary keepalive message (B)
@@ -1484,23 +1463,33 @@ The commands accepted in walsender mode are:
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The current end of WAL on the server, given in
-          XLogRecPtr format.
+          The current end of WAL on the server.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The server's system clock at the time of transmission,
-          given in TimestampTz format.
+          The server's system clock at the time of transmission, as
+          microseconds since midnight on 2000-01-01.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte1
+      </term>
+      <listitem>
+      <para>
+          1 means that the client should reply to this message as soon as
+          possible, to avoid a timeout disconnect. 0 otherwise.
       </para>
       </listitem>
       </varlistentry>
@@ -1511,6 +1500,12 @@ The commands accepted in walsender mode are:
       </variablelist>
      </para>
 
+     <para>
+       The receiving process can send replies back to the sender at any time,
+       using one of the following message formats (also in the payload of a
+       CopyData message):
+     </para>
+
      <para>
       <variablelist>
       <varlistentry>
@@ -1532,45 +1527,56 @@ The commands accepted in walsender mode are:
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
           The location of the last WAL byte + 1 received and written to disk
-          in the standby, in XLogRecPtr format.
+          in the standby.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
           The location of the last WAL byte + 1 flushed to disk in
-          the standby, in XLogRecPtr format.
+          the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int64
+      </term>
+      <listitem>
+      <para>
+          The location of the last WAL byte + 1 applied in the standby.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The location of the last WAL byte + 1 applied in the standby, in
-          XLogRecPtr format.
+          The client's system clock at the time of transmission, as
+          microseconds since midnight on 2000-01-01.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Byte1
       </term>
       <listitem>
       <para>
-          The server's system clock at the time of transmission,
-          given in TimestampTz format.
+          If 1, the client requests the server to reply to this message
+          immediately. This can be used to ping the server, to test if
+          the connection is still healthy.
       </para>
       </listitem>
       </varlistentry>
@@ -1602,28 +1608,29 @@ The commands accepted in walsender mode are:
       </varlistentry>
       <varlistentry>
       <term>
-          Byte8
+          Int64
       </term>
       <listitem>
       <para>
-          The server's system clock at the time of transmission,
-          given in TimestampTz format.
+          The client's system clock at the time of transmission, as
+          microseconds since midnight on 2000-01-01.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte4
+          Int32
       </term>
       <listitem>
       <para>
-          The standby's current xmin.
+          The standby's current xmin. This may be 0, if the standby does not
+          support feedback, or is not yet in Hot Standby state.
       </para>
       </listitem>
       </varlistentry>
       <varlistentry>
       <term>
-          Byte4
+          Int32
       </term>
       <listitem>
       <para>
index b1accdcceaf701e43233d24651c14da91e80e92e..62135037f104837b154cf7293d51418c08e1f660 100644 (file)
@@ -39,9 +39,9 @@
 #include <unistd.h>
 
 #include "access/xlog_internal.h"
+#include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
-#include "replication/walprotocol.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/ipc.h"
@@ -93,8 +93,8 @@ static struct
        XLogRecPtr      Flush;                  /* last byte + 1 flushed in the standby */
 }      LogstreamResult;
 
-static StandbyReplyMessage reply_message;
-static StandbyHSFeedbackMessage feedback_message;
+static StringInfoData  reply_message;
+static StringInfoData  incoming_message;
 
 /*
  * About SIGTERM handling:
@@ -279,10 +279,10 @@ WalReceiverMain(void)
        walrcv_connect(conninfo, startpoint);
        DisableWalRcvImmediateExit();
 
-       /* Initialize LogstreamResult, reply_message and feedback_message */
+       /* Initialize LogstreamResult and buffers for processing messages */
        LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
-       MemSet(&reply_message, 0, sizeof(reply_message));
-       MemSet(&feedback_message, 0, sizeof(feedback_message));
+       initStringInfo(&reply_message);
+       initStringInfo(&incoming_message);
 
        /* Initialize the last recv timestamp */
        last_recv_timestamp = GetCurrentTimestamp();
@@ -480,41 +480,58 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
 static void
 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 {
+       int                     hdrlen;
+       XLogRecPtr      dataStart;
+       XLogRecPtr      walEnd;
+       TimestampTz     sendTime;
+       bool            replyRequested;
+
+       resetStringInfo(&incoming_message);
+
        switch (type)
        {
                case 'w':                               /* WAL records */
                        {
-                               WalDataMessageHeader msghdr;
-
-                               if (len < sizeof(WalDataMessageHeader))
+                               /* copy message to StringInfo */
+                               hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
+                               if (len < hdrlen)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                         errmsg_internal("invalid WAL message received from primary")));
-                               /* memcpy is required here for alignment reasons */
-                               memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
-
-                               ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
-
-                               buf += sizeof(WalDataMessageHeader);
-                               len -= sizeof(WalDataMessageHeader);
-                               XLogWalRcvWrite(buf, len, msghdr.dataStart);
+                               appendBinaryStringInfo(&incoming_message, buf, hdrlen);
+
+                               /* read the fields */
+                               dataStart = pq_getmsgint64(&incoming_message);
+                               walEnd = pq_getmsgint64(&incoming_message);
+                               sendTime = IntegerTimestampToTimestampTz(
+                                       pq_getmsgint64(&incoming_message));
+                               ProcessWalSndrMessage(walEnd, sendTime);
+
+                               buf += hdrlen;
+                               len -= hdrlen;
+                               XLogWalRcvWrite(buf, len, dataStart);
                                break;
                        }
                case 'k':                               /* Keepalive */
                        {
-                               PrimaryKeepaliveMessage keepalive;
-
-                               if (len != sizeof(PrimaryKeepaliveMessage))
+                               /* copy message to StringInfo */
+                               hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+                               if (len != hdrlen)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                         errmsg_internal("invalid keepalive message received from primary")));
-                               /* memcpy is required here for alignment reasons */
-                               memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+                               appendBinaryStringInfo(&incoming_message, buf, hdrlen);
 
-                               ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+                               /* read the fields */
+                               walEnd = pq_getmsgint64(&incoming_message);
+                               sendTime = IntegerTimestampToTimestampTz(
+                                       pq_getmsgint64(&incoming_message));
+                               replyRequested = pq_getmsgbyte(&incoming_message);
+
+                               ProcessWalSndrMessage(walEnd, sendTime);
 
                                /* If the primary requested a reply, send one immediately */
-                               if (keepalive.replyRequested)
+                               if (replyRequested)
                                        XLogWalRcvSendReply(true, false);
                                break;
                        }
@@ -685,7 +702,10 @@ XLogWalRcvFlush(bool dying)
 static void
 XLogWalRcvSendReply(bool force, bool requestReply)
 {
-       char            buf[sizeof(StandbyReplyMessage) + 1];
+       static XLogRecPtr writePtr = 0;
+       static XLogRecPtr flushPtr = 0;
+       XLogRecPtr      applyPtr;
+       static TimestampTz sendTime = 0;
        TimestampTz now;
 
        /*
@@ -708,28 +728,34 @@ XLogWalRcvSendReply(bool force, bool requestReply)
         * probably OK.
         */
        if (!force
-               && XLByteEQ(reply_message.write, LogstreamResult.Write)
-               && XLByteEQ(reply_message.flush, LogstreamResult.Flush)
-               && !TimestampDifferenceExceeds(reply_message.sendTime, now,
+               && XLByteEQ(writePtr, LogstreamResult.Write)
+               && XLByteEQ(flushPtr, LogstreamResult.Flush)
+               && !TimestampDifferenceExceeds(sendTime, now,
                                                                           wal_receiver_status_interval * 1000))
                return;
+       sendTime = now;
 
        /* Construct a new message */
-       reply_message.write = LogstreamResult.Write;
-       reply_message.flush = LogstreamResult.Flush;
-       reply_message.apply = GetXLogReplayRecPtr(NULL);
-       reply_message.sendTime = now;
-       reply_message.replyRequested = requestReply;
-
-       elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
-                (uint32) (reply_message.write >> 32), (uint32) reply_message.write,
-                (uint32) (reply_message.flush >> 32), (uint32) reply_message.flush,
-                (uint32) (reply_message.apply >> 32), (uint32) reply_message.apply);
-
-       /* Prepend with the message type and send it. */
-       buf[0] = 'r';
-       memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
-       walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+       writePtr = LogstreamResult.Write;
+       flushPtr = LogstreamResult.Flush;
+       applyPtr = GetXLogReplayRecPtr(NULL);
+
+       resetStringInfo(&reply_message);
+       pq_sendbyte(&reply_message, 'r');
+       pq_sendint64(&reply_message, writePtr);
+       pq_sendint64(&reply_message, flushPtr);
+       pq_sendint64(&reply_message, applyPtr);
+       pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+       pq_sendbyte(&reply_message, requestReply ? 1 : 0);
+
+       /* Send it */
+       elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
+                (uint32) (writePtr >> 32), (uint32) writePtr,
+                (uint32) (flushPtr >> 32), (uint32) flushPtr,
+                (uint32) (applyPtr >> 32), (uint32) applyPtr,
+                requestReply ? " (reply requested)" : "");
+
+       walrcv_send(reply_message.data, reply_message.len);
 }
 
 /*
@@ -739,11 +765,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 static void
 XLogWalRcvSendHSFeedback(void)
 {
-       char            buf[sizeof(StandbyHSFeedbackMessage) + 1];
        TimestampTz now;
        TransactionId nextXid;
        uint32          nextEpoch;
        TransactionId xmin;
+       static TimestampTz sendTime = 0;
 
        /*
         * If the user doesn't want status to be reported to the master, be sure
@@ -758,9 +784,10 @@ XLogWalRcvSendHSFeedback(void)
        /*
         * Send feedback at most once per wal_receiver_status_interval.
         */
-       if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
+       if (!TimestampDifferenceExceeds(sendTime, now,
                                                                        wal_receiver_status_interval * 1000))
                return;
+       sendTime = now;
 
        /*
         * If Hot Standby is not yet active there is nothing to send. Check this
@@ -783,25 +810,23 @@ XLogWalRcvSendHSFeedback(void)
        if (nextXid < xmin)
                nextEpoch--;
 
-       /*
-        * Always send feedback message.
-        */
-       feedback_message.sendTime = now;
-       feedback_message.xmin = xmin;
-       feedback_message.epoch = nextEpoch;
-
        elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-                feedback_message.xmin,
-                feedback_message.epoch);
-
-       /* Prepend with the message type and send it. */
-       buf[0] = 'h';
-       memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
-       walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
+                xmin, nextEpoch);
+
+       /* Construct the the message and send it. */
+       resetStringInfo(&reply_message);
+       pq_sendbyte(&reply_message, 'h');
+       pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+       pq_sendint(&reply_message, xmin, 4);
+       pq_sendint(&reply_message, nextEpoch, 4);
+       walrcv_send(reply_message.data, reply_message.len);
 }
 
 /*
- * Keep track of important messages from primary.
+ * Update shared memory status upon receiving a message from primary.
+ *
+ * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
+ * message, reported by primary.
  */
 static void
 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
index 2af38f1cbeae428bce044f1374d4c10f0508cf80..8774d7e8229edc34634bf4bcf7e4b5469e3fe5ad 100644 (file)
@@ -48,7 +48,6 @@
 #include "nodes/replnodes.h"
 #include "replication/basebackup.h"
 #include "replication/syncrep.h"
-#include "replication/walprotocol.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/walsender_private.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
 
+/*
+ * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
+ *
+ * We don't have a good idea of what a good value would be; there's some
+ * overhead per message in both walsender and walreceiver, but on the other
+ * hand sending large batches makes walsender less responsive to signals
+ * because signals are checked only between messages.  128kB (with
+ * default 8k blocks) seems like a reasonable guess for now.
+ */
+#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
 
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
@@ -103,13 +112,10 @@ static uint32 sendOff = 0;
  */
 static XLogRecPtr sentPtr = 0;
 
-/* Buffer for processing reply messages. */
+/* Buffers for constructing outgoing messages and processing reply messages. */
+static StringInfoData output_message;
 static StringInfoData reply_message;
-/*
- * Buffer for constructing outgoing messages.
- * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes)
- */
-static char *output_message;
+static StringInfoData tmpbuf;
 
 /*
  * Timestamp of the last receipt of the reply from the standby.
@@ -526,17 +532,26 @@ ProcessStandbyMessage(void)
 static void
 ProcessStandbyReplyMessage(void)
 {
-       StandbyReplyMessage reply;
-
-       pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
-
-       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
-                (uint32) (reply.write >> 32), (uint32) reply.write,
-                (uint32) (reply.flush >> 32), (uint32) reply.flush,
-                (uint32) (reply.apply >> 32), (uint32) reply.apply);
+       XLogRecPtr      writePtr,
+                               flushPtr,
+                               applyPtr;
+       bool            replyRequested;
+
+       /* the caller already consumed the msgtype byte */
+       writePtr = pq_getmsgint64(&reply_message);
+       flushPtr = pq_getmsgint64(&reply_message);
+       applyPtr = pq_getmsgint64(&reply_message);
+       (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
+       replyRequested = pq_getmsgbyte(&reply_message);
+
+       elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
+                (uint32) (writePtr >> 32), (uint32) writePtr,
+                (uint32) (flushPtr >> 32), (uint32) flushPtr,
+                (uint32) (applyPtr >> 32), (uint32) applyPtr,
+                replyRequested ? " (reply requested)" : "");
 
        /* Send a reply if the standby requested one. */
-       if (reply.replyRequested)
+       if (replyRequested)
                WalSndKeepalive(false);
 
        /*
@@ -548,9 +563,9 @@ ProcessStandbyReplyMessage(void)
                volatile WalSnd *walsnd = MyWalSnd;
 
                SpinLockAcquire(&walsnd->mutex);
-               walsnd->write = reply.write;
-               walsnd->flush = reply.flush;
-               walsnd->apply = reply.apply;
+               walsnd->write = writePtr;
+               walsnd->flush = flushPtr;
+               walsnd->apply = applyPtr;
                SpinLockRelease(&walsnd->mutex);
        }
 
@@ -564,20 +579,25 @@ ProcessStandbyReplyMessage(void)
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-       StandbyHSFeedbackMessage reply;
        TransactionId nextXid;
        uint32          nextEpoch;
+       TransactionId feedbackXmin;
+       uint32          feedbackEpoch;
 
-       /* Decipher the reply message */
-       pq_copymsgbytes(&reply_message, (char *) &reply,
-                                       sizeof(StandbyHSFeedbackMessage));
+       /*
+        * Decipher the reply message. The caller already consumed the msgtype
+        * byte.
+        */
+       (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
+       feedbackXmin = pq_getmsgint(&reply_message, 4);
+       feedbackEpoch = pq_getmsgint(&reply_message, 4);
 
        elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
-                reply.xmin,
-                reply.epoch);
+                feedbackXmin,
+                feedbackEpoch);
 
        /* Ignore invalid xmin (can't actually happen with current walreceiver) */
-       if (!TransactionIdIsNormal(reply.xmin))
+       if (!TransactionIdIsNormal(feedbackXmin))
                return;
 
        /*
@@ -589,18 +609,18 @@ ProcessStandbyHSFeedbackMessage(void)
         */
        GetNextXidAndEpoch(&nextXid, &nextEpoch);
 
-       if (reply.xmin <= nextXid)
+       if (feedbackXmin <= nextXid)
        {
-               if (reply.epoch != nextEpoch)
+               if (feedbackEpoch != nextEpoch)
                        return;
        }
        else
        {
-               if (reply.epoch + 1 != nextEpoch)
+               if (feedbackEpoch + 1 != nextEpoch)
                        return;
        }
 
-       if (!TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
+       if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
                return;                                 /* epoch OK, but it's wrapped around */
 
        /*
@@ -610,9 +630,9 @@ ProcessStandbyHSFeedbackMessage(void)
         * cleanup conflicts on the standby server.
         *
         * There is a small window for a race condition here: although we just
-        * checked that reply.xmin precedes nextXid, the nextXid could have gotten
+        * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
         * advanced between our fetching it and applying the xmin below, perhaps
-        * far enough to make reply.xmin wrap around.  In that case the xmin we
+        * far enough to make feedbackXmin wrap around.  In that case the xmin we
         * set here would be "in the future" and have no effect.  No point in
         * worrying about this since it's too late to save the desired data
         * anyway.      Assuming that the standby sends us an increasing sequence of
@@ -625,7 +645,7 @@ ProcessStandbyHSFeedbackMessage(void)
         * safe, and if we're moving it backwards, well, the data is at risk
         * already since a VACUUM could have just finished calling GetOldestXmin.)
         */
-       MyPgXact->xmin = reply.xmin;
+       MyPgXact->xmin = feedbackXmin;
 }
 
 /* Main loop of walsender process that streams the WAL over Copy messages. */
@@ -635,17 +655,12 @@ WalSndLoop(void)
        bool            caughtup = false;
 
        /*
-        * Allocate buffer that will be used for each output message.  We do this
-        * just once to reduce palloc overhead.  The buffer must be made large
-        * enough for maximum-sized messages.
-        */
-       output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
-
-       /*
-        * Allocate buffer that will be used for processing reply messages.  As
-        * above, do this just once to reduce palloc overhead.
+        * Allocate buffers that will be used for each outgoing and incoming
+        * message.  We do this just once to reduce palloc overhead.
         */
+       initStringInfo(&output_message);
        initStringInfo(&reply_message);
+       initStringInfo(&tmpbuf);
 
        /* Initialize the last reply timestamp */
        last_reply_timestamp = GetCurrentTimestamp();
@@ -1048,7 +1063,6 @@ XLogSend(bool *caughtup)
        XLogRecPtr      startptr;
        XLogRecPtr      endptr;
        Size            nbytes;
-       WalDataMessageHeader msghdr;
 
        /*
         * Attempt to send all data that's already been written out and fsync'd to
@@ -1125,25 +1139,31 @@ XLogSend(bool *caughtup)
        /*
         * OK to read and send the slice.
         */
-       output_message[0] = 'w';
+       resetStringInfo(&output_message);
+       pq_sendbyte(&output_message, 'w');
+
+       pq_sendint64(&output_message, startptr);        /* dataStart */
+       pq_sendint64(&output_message, SendRqstPtr);     /* walEnd */
+       pq_sendint64(&output_message, 0);                       /* sendtime, filled in last */
 
        /*
         * Read the log directly into the output buffer to avoid extra memcpy
         * calls.
         */
-       XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+       enlargeStringInfo(&output_message, nbytes);
+       XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+       output_message.len += nbytes;
+       output_message.data[output_message.len] = '\0';
 
        /*
-        * We fill the message header last so that the send timestamp is taken as
-        * late as possible.
+        * Fill the send timestamp last, so that it is taken as late as possible.
         */
-       msghdr.dataStart = startptr;
-       msghdr.walEnd = SendRqstPtr;
-       msghdr.sendTime = GetCurrentTimestamp();
+       resetStringInfo(&tmpbuf);
+       pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+       memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
+                  tmpbuf.data, sizeof(int64));
 
-       memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader));
-
-       pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes);
+       pq_putmessage_noblock('d', output_message.data, output_message.len);
 
        sentPtr = endptr;
 
@@ -1518,19 +1538,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply)
 {
-       PrimaryKeepaliveMessage keepalive_message;
-
-       /* Construct a new message */
-       keepalive_message.walEnd = sentPtr;
-       keepalive_message.sendTime = GetCurrentTimestamp();
-       keepalive_message.replyRequested = requestReply;
-
        elog(DEBUG2, "sending replication keepalive");
 
-       /* Prepend with the message type and send it. */
-       output_message[0] = 'k';
-       memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
-       pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1);
+       /* construct the message... */
+       resetStringInfo(&output_message);
+       pq_sendbyte(&output_message, 'k');
+       pq_sendint64(&output_message, sentPtr);
+       pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
+       pq_sendbyte(&output_message, requestReply ? 1 : 0);
+
+       /* ... and send it wrapped in CopyData */
+       pq_putmessage_noblock('d', output_message.data, output_message.len);
 }
 
 /*
index 50ef8976bed9099efb9cd296ee570ffbadd00c83..6ff7385233ebbccf8ab3f0023522b9fdae34bf8d 100644 (file)
@@ -1285,6 +1285,50 @@ GetCurrentTimestamp(void)
        return result;
 }
 
+/*
+ * GetCurrentIntegerTimestamp -- get the current operating system time as int64
+ *
+ * Result is the number of milliseconds since the Postgres epoch. If compiled
+ * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(),
+ * and is implemented as a macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+GetCurrentIntegerTimestamp(void)
+{
+       int64 result;
+       struct timeval tp;
+
+       gettimeofday(&tp, NULL);
+
+       result = (int64) tp.tv_sec -
+               ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+       result = (result * USECS_PER_SEC) + tp.tv_usec;
+
+       return result;
+}
+#endif
+
+/*
+ * IntegetTimestampToTimestampTz -- convert an int64 timestamp to native format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+TimestampTz
+IntegerTimestampToTimestampTz(int64 timestamp)
+{
+       TimestampTz result;
+
+       result = timestamp / USECS_PER_SEC;
+       result += (timestamp % USECS_PER_SEC) / 1000000.0;
+
+       return result;
+}
+#endif
+
 /*
  * TimestampDifference -- convert the difference between two timestamps
  *             into integer seconds and microseconds
index 404ff9171508bd4ebe25705166cf708329b5ebef..aed90954e6c6efe173a25d5bda4095e3e51112f1 100644 (file)
@@ -21,7 +21,6 @@
 #include "postgres.h"
 #include "libpq-fe.h"
 #include "access/xlog_internal.h"
-#include "replication/walprotocol.h"
 #include "utils/datetime.h"
 #include "utils/timestamp.h"
 
 #include <unistd.h>
 
 
-/* Size of the streaming replication protocol headers */
-#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
-#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
-
 /* fd for currently open WAL file */
 static int     walfile = -1;
 
-
 /*
  * Open a new WAL file in the specified directory. Store the name
  * (not including the full directory) in namebuf. Assumes there is
@@ -189,37 +183,34 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
 
 /*
  * Local version of GetCurrentTimestamp(), since we are not linked with
- * backend code.
+ * backend code. The protocol always uses integer timestamps, regardless of
+ * server setting.
  */
-static TimestampTz
+static int64
 localGetCurrentTimestamp(void)
 {
-       TimestampTz result;
+       int64 result;
        struct timeval tp;
 
        gettimeofday(&tp, NULL);
 
-       result = (TimestampTz) tp.tv_sec -
+       result = (int64) tp.tv_sec -
                ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
 
-#ifdef HAVE_INT64_TIMESTAMP
        result = (result * USECS_PER_SEC) + tp.tv_usec;
-#else
-       result = result + (tp.tv_usec / 1000000.0);
-#endif
 
        return result;
 }
 
 /*
- * Local version of TimestampDifference(), since we are not
- * linked with backend code.
+ * Local version of TimestampDifference(), since we are not linked with
+ * backend code.
  */
 static void
-localTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
+localTimestampDifference(int64 start_time, int64 stop_time,
                                                 long *secs, int *microsecs)
 {
-       TimestampTz diff = stop_time - start_time;
+       int64 diff = stop_time - start_time;
 
        if (diff <= 0)
        {
@@ -228,13 +219,8 @@ localTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
        }
        else
        {
-#ifdef HAVE_INT64_TIMESTAMP
                *secs = (long) (diff / USECS_PER_SEC);
                *microsecs = (int) (diff % USECS_PER_SEC);
-#else
-               *secs = (long) diff;
-               *microsecs = (int) ((diff - *secs) * 1000000.0);
-#endif
        }
 }
 
@@ -243,17 +229,86 @@ localTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
  * linked with backend code.
  */
 static bool
-localTimestampDifferenceExceeds(TimestampTz start_time,
-                                                               TimestampTz stop_time,
+localTimestampDifferenceExceeds(int64 start_time,
+                                                               int64 stop_time,
                                                                int msec)
 {
-       TimestampTz diff = stop_time - start_time;
+       int64 diff = stop_time - start_time;
 
-#ifdef HAVE_INT64_TIMESTAMP
        return (diff >= msec * INT64CONST(1000));
-#else
-       return (diff * 1000.0 >= msec);
-#endif
+}
+
+/*
+ * Converts an int64 to network byte order.
+ */
+static void
+sendint64(int64 i, char *buf)
+{
+       uint32          n32;
+
+       /* High order half first, since we're doing MSB-first */
+       n32 = (uint32) (i >> 32);
+       n32 = htonl(n32);
+       memcpy(&buf[0], &n32, 4);
+
+       /* Now the low order half */
+       n32 = (uint32) i;
+       n32 = htonl(n32);
+       memcpy(&buf[4], &n32, 4);
+}
+
+/*
+ * Converts an int64 from network byte order to native format.
+ */
+static int64
+recvint64(char *buf)
+{
+       int64           result;
+       uint32          h32;
+       uint32          l32;
+
+       memcpy(&h32, buf, 4);
+       memcpy(&l32, buf + 4, 4);
+       h32 = ntohl(h32);
+       l32 = ntohl(l32);
+
+       result = h32;
+       result <<= 32;
+       result |= l32;
+
+       return result;
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ */
+static bool
+sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
+{
+       char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
+       int             len = 0;
+
+       replybuf[len] = 'r';
+       len += 1;
+       sendint64(blockpos, &replybuf[len]);                    /* write */
+       len += 8;
+       sendint64(InvalidXLogRecPtr, &replybuf[len]);   /* flush */
+       len += 8;
+       sendint64(InvalidXLogRecPtr, &replybuf[len]);   /* apply */
+       len += 8;
+       sendint64(now, &replybuf[len]);                                 /* sendTime */
+       len += 8;
+       replybuf[len] = 0;                                                              /* replyRequested */
+       len += 1;
+
+       if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
+       {
+               fprintf(stderr, _("%s: could not send feedback packet: %s"),
+                               progname, PQerrorMessage(conn));
+               return false;
+       }
+
+       return true;
 }
 
 /*
@@ -382,24 +437,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                                                                        standby_message_timeout))
                {
                        /* Time to send feedback! */
-                       char            replybuf[sizeof(StandbyReplyMessage) + 1];
-                       StandbyReplyMessage *replymsg;
-
-                       replymsg = (StandbyReplyMessage *) (replybuf + 1);
-                       replymsg->write = blockpos;
-                       replymsg->flush = InvalidXLogRecPtr;
-                       replymsg->apply = InvalidXLogRecPtr;
-                       replymsg->sendTime = now;
-                       replybuf[0] = 'r';
-
-                       if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
-                               PQflush(conn))
-                       {
-                               fprintf(stderr, _("%s: could not send feedback packet: %s"),
-                                               progname, PQerrorMessage(conn));
+                       if (!sendFeedback(conn, blockpos, now))
                                goto error;
-                       }
-
                        last_status = now;
                }
 
@@ -419,12 +458,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        FD_SET(PQsocket(conn), &input_mask);
                        if (standby_message_timeout)
                        {
-                               TimestampTz targettime;
+                               int64           targettime;
                                long            secs;
                                int                     usecs;
 
-                               targettime = TimestampTzPlusMilliseconds(last_status,
-                                                                                               standby_message_timeout - 1);
+                               targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
                                localTimestampDifference(now,
                                                                                 targettime,
                                                                                 &secs,
@@ -474,19 +512,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                        progname, PQerrorMessage(conn));
                        goto error;
                }
+
+               /* Check the message type. */
                if (copybuf[0] == 'k')
                {
                        /*
                         * keepalive message, sent in 9.2 and newer. We just ignore this
                         * message completely, but need to skip past it in the stream.
                         */
-                       if (r != STREAMING_KEEPALIVE_SIZE)
-                       {
-                               fprintf(stderr,
-                                               _("%s: keepalive message has incorrect size %d\n"),
-                                               progname, r);
-                               goto error;
-                       }
                        continue;
                }
                else if (copybuf[0] != 'w')
@@ -495,15 +528,22 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                        progname, copybuf[0]);
                        goto error;
                }
+
+               /*
+                * Read the header of the XLogData message, enclosed in the CopyData
+                * message. We only need the WAL location field (dataStart), the rest
+                * of the header is ignored.
+                */
+#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */)
                if (r < STREAMING_HEADER_SIZE + 1)
                {
                        fprintf(stderr, _("%s: streaming header too small: %d\n"),
                                        progname, r);
                        goto error;
                }
+               blockpos = recvint64(&copybuf[1]);
 
                /* Extract WAL location for this block */
-               memcpy(&blockpos, copybuf + 1, 8);
                xlogoff = blockpos % XLOG_SEG_SIZE;
 
                /*
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
deleted file mode 100644 (file)
index 396d006..0000000
+++ /dev/null
@@ -1,128 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * walprotocol.h
- *       Definitions relevant to the streaming WAL transmission protocol.
- *
- * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
- *
- * src/include/replication/walprotocol.h
- *
- *-------------------------------------------------------------------------
- */
-#ifndef _WALPROTOCOL_H
-#define _WALPROTOCOL_H
-
-#include "access/xlogdefs.h"
-#include "datatype/timestamp.h"
-
-
-/*
- * All messages from WalSender must contain these fields to allow us to
- * correctly calculate the replication delay.
- */
-typedef struct
-{
-       /* Current end of WAL on the sender */
-       XLogRecPtr      walEnd;
-
-       /* Sender's system clock at the time of transmission */
-       TimestampTz sendTime;
-
-       /*
-        * If replyRequested is set, the client should reply immediately to this
-        * message, to avoid a timeout disconnect.
-        */
-       bool            replyRequested;
-} WalSndrMessage;
-
-
-/*
- * Header for a WAL data message (message type 'w').  This is wrapped within
- * a CopyData message at the FE/BE protocol level.
- *
- * The header is followed by actual WAL data.  Note that the data length is
- * not specified in the header --- it's just whatever remains in the message.
- *
- * walEnd and sendTime are not essential data, but are provided in case
- * the receiver wants to adjust its behavior depending on how far behind
- * it is.
- */
-typedef struct
-{
-       /* WAL start location of the data included in this message */
-       XLogRecPtr      dataStart;
-
-       /* Current end of WAL on the sender */
-       XLogRecPtr      walEnd;
-
-       /* Sender's system clock at the time of transmission */
-       TimestampTz sendTime;
-} WalDataMessageHeader;
-
-/*
- * Keepalive message from primary (message type 'k'). (lowercase k)
- * This is wrapped within a CopyData message at the FE/BE protocol level.
- *
- * Note that the data length is not specified here.
- */
-typedef WalSndrMessage PrimaryKeepaliveMessage;
-
-/*
- * Reply message from standby (message type 'r').  This is wrapped within
- * a CopyData message at the FE/BE protocol level.
- *
- * Note that the data length is not specified here.
- */
-typedef struct
-{
-       /*
-        * The xlog locations that have been written, flushed, and applied by
-        * standby-side. These may be invalid if the standby-side is unable to or
-        * chooses not to report these.
-        */
-       XLogRecPtr      write;
-       XLogRecPtr      flush;
-       XLogRecPtr      apply;
-
-       /* Sender's system clock at the time of transmission */
-       TimestampTz sendTime;
-
-       /*
-        * If replyRequested is set, the server should reply immediately to this
-        * message, to avoid a timeout disconnect.
-        */
-       bool            replyRequested;
-} StandbyReplyMessage;
-
-/*
- * Hot Standby feedback from standby (message type 'h').  This is wrapped within
- * a CopyData message at the FE/BE protocol level.
- *
- * Note that the data length is not specified here.
- */
-typedef struct
-{
-       /*
-        * The current xmin and epoch from the standby, for Hot Standby feedback.
-        * This may be invalid if the standby-side does not support feedback, or
-        * Hot Standby is not yet available.
-        */
-       TransactionId xmin;
-       uint32          epoch;
-
-       /* Sender's system clock at the time of transmission */
-       TimestampTz sendTime;
-} StandbyHSFeedbackMessage;
-
-/*
- * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
- *
- * We don't have a good idea of what a good value would be; there's some
- * overhead per message in both walsender and walreceiver, but on the other
- * hand sending large batches makes walsender less responsive to signals
- * because signals are checked only between messages.  128kB (with
- * default 8k blocks) seems like a reasonable guess for now.
- */
-#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
-
-#endif   /* _WALPROTOCOL_H */
index e7cdb417e59e6534d6b75c48a6efd226d6bbe349..b4b402f018f9d9e11f6c843c9ecd727675ca9a77 100644 (file)
@@ -206,13 +206,24 @@ extern Datum generate_series_timestamptz(PG_FUNCTION_ARGS);
 /* Internal routines (not fmgr-callable) */
 
 extern TimestampTz GetCurrentTimestamp(void);
-
 extern void TimestampDifference(TimestampTz start_time, TimestampTz stop_time,
                                        long *secs, int *microsecs);
 extern bool TimestampDifferenceExceeds(TimestampTz start_time,
                                                   TimestampTz stop_time,
                                                   int msec);
 
+/*
+ * Prototypes for functions to deal with integer timestamps, when the native
+ * format is float timestamps.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+extern int64 GetCurrentIntegerTimestamp(void);
+extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+#else
+#define GetCurrentIntegerTimestamp()   GetCurrentTimestamp()
+#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#endif
+
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
 extern pg_time_t timestamptz_to_time_t(TimestampTz t);