Add a message type header to the CopyData messages sent from primary
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 3 Feb 2010 09:47:19 +0000 (09:47 +0000)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 3 Feb 2010 09:47:19 +0000 (09:47 +0000)
to standby in streaming replication. While we only have one message type
at the moment, adding a message type header makes this easier to extend.

doc/src/sgml/protocol.sgml
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walreceiver.h

index 54e03998ff9c062060026735a5b509ebb250da71..845e41813073dacdbc02a3263028a8d63e96e324 100644 (file)
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.77 2010/01/15 09:18:59 heikki Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/protocol.sgml,v 1.78 2010/02/03 09:47:19 heikki Exp $ -->
 
 <chapter id="protocol">
  <title>Frontend/Backend Protocol</title>
@@ -4179,12 +4179,65 @@ The commands accepted in walsender mode are:
       already been recycled. On success, server responds with a
       CopyOutResponse message, and backend starts to stream WAL as CopyData
       messages.
+      The payload in CopyData message consists of the following format.
      </para>
 
      <para>
-      The payload in each CopyData message consists of an XLogRecPtr,
-      indicating the starting point of the WAL in the message, immediately
-      followed by the WAL data itself.
+      <variablelist>
+      <varlistentry>
+      <term>
+          XLogData (B)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('w')
+      </term>
+      <listitem>
+      <para>
+          Identifies the message as WAL data.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The log file number of the LSN, indicating the starting point of
+          the WAL in the message.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The byte offset of the LSN, indicating the starting point of
+          the WAL in the message.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte<replaceable>n</replaceable>
+      </term>
+      <listitem>
+      <para>
+          Data that forms part of WAL data stream.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
      </para>
      <para>
        A single WAL record is never split across two CopyData messages. When
index b7a24e56f5611182a435494329851aac6e48fd68..039370a8515f22621c00c24aa100098163abcaff 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.2 2010/01/20 11:58:44 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -48,8 +48,8 @@ static char *recvBuf = NULL;
 
 /* Prototypes for interface functions */
 static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
-static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
-                         int *len);
+static bool libpqrcv_receive(int timeout, unsigned char *type,
+                                                        char **buffer, int *len);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
@@ -236,13 +236,13 @@ libpqrcv_disconnect(void)
 }
 
 /*
- * Receive any WAL records available from XLOG stream, blocking for
+ * Receive a message available from XLOG stream, blocking for
  * maximum of 'timeout' ms.
  *
  * Returns:
  *
- *   True if data was received. *recptr, *buffer and *len are set to
- *   the WAL location of the received data, buffer holding it, and length,
+ *   True if data was received. *type, *buffer and *len are set to
+ *   the type of the received data, buffer holding it, and length,
  *   respectively.
  *
  *   False if no data was available within timeout, or wait was interrupted
@@ -254,7 +254,7 @@ libpqrcv_disconnect(void)
  * ereports on error.
  */
 static bool
-libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
 {
        int                     rawlen;
 
@@ -275,14 +275,14 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
 
                if (PQconsumeInput(streamConn) == 0)
                        ereport(ERROR,
-                                       (errmsg("could not read xlog records: %s",
+                                       (errmsg("could not receive data from XLOG stream: %s",
                                                        PQerrorMessage(streamConn))));
        }
        justconnected = false;
 
        /* Receive CopyData message */
        rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
-       if (rawlen == 0)        /* no records available yet, then return */
+       if (rawlen == 0)        /* no data available yet, then return */
                return false;
        if (rawlen == -1)       /* end-of-streaming or error */
        {
@@ -297,22 +297,18 @@ libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
                }
                PQclear(res);
                ereport(ERROR,
-                               (errmsg("could not read xlog records: %s",
+                               (errmsg("could not receive data from XLOG stream: %s",
                                                PQerrorMessage(streamConn))));
        }
        if (rawlen < -1)
                ereport(ERROR,
-                               (errmsg("could not read xlog records: %s",
+                               (errmsg("could not receive data from XLOG stream: %s",
                                                PQerrorMessage(streamConn))));
 
-       if (rawlen < sizeof(XLogRecPtr))
-               ereport(ERROR,
-                               (errmsg("invalid WAL message received from primary")));
-
-       /* Return received WAL records to caller */
-       *recptr = *((XLogRecPtr *) recvBuf);
-       *buffer = recvBuf + sizeof(XLogRecPtr);
-       *len = rawlen - sizeof(XLogRecPtr);
+       /* Return received messages to caller */
+       *type = *((unsigned char *) recvBuf);
+       *buffer = recvBuf + sizeof(*type);
+       *len = rawlen - sizeof(*type);
 
        return true;
 }
index 4a5ba5b4263631e08bc7586fe47ca81abee1416c..a2f15a9a03e50c6082c9a22ed3d469dcbad35d96 100644 (file)
@@ -29,7 +29,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
 static void WalRcvDie(int code, Datum arg);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
 
@@ -258,7 +259,7 @@ WalReceiverMain(void)
        /* Loop until end-of-streaming or error */
        for (;;)
        {
-               XLogRecPtr recptr;
+               unsigned char   type;
                char   *buf;
                int             len;
 
@@ -287,17 +288,17 @@ WalReceiverMain(void)
                }
 
                /* Wait a while for data to arrive */
-               if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
+               if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
                {
-                       /* Write received WAL records to disk */
-                       XLogWalRcvWrite(buf, len, recptr);
+                       /* Accept the received data, and process it */
+                       XLogWalRcvProcessMsg(type, buf, len);
 
-                       /* Receive any more WAL records we can without sleeping */
-                       while(walrcv_receive(0, &recptr, &buf, &len))
-                               XLogWalRcvWrite(buf, len, recptr);
+                       /* Receive any more data we can without sleeping */
+                       while(walrcv_receive(0, &type, &buf, &len))
+                               XLogWalRcvProcessMsg(type, buf, len);
 
                        /*
-                        * Now that we've written some records, flush them to disk and
+                        * If we've written some records, flush them to disk and
                         * let the startup process know about them.
                         */
                        XLogWalRcvFlush();
@@ -375,6 +376,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
        exit(2);
 }
 
+/*
+ * Accept the message from XLOG stream, and process it.
+ */
+static void
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+{
+       switch (type)
+       {
+               case 'w':       /* WAL records */
+               {
+                       XLogRecPtr      recptr;
+
+                       if (len < sizeof(XLogRecPtr))
+                               ereport(ERROR,
+                                               (errmsg("invalid WAL message received from primary")));
+
+                       recptr = *((XLogRecPtr *) buf);
+                       buf += sizeof(XLogRecPtr);
+                       len -= sizeof(XLogRecPtr);
+                       XLogWalRcvWrite(buf, len, recptr);
+                       break;
+               }
+               default:
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                        errmsg("invalid replication message type %d",
+                                                       type)));
+       }
+}
+
 /*
  * Write XLOG data to disk.
  */
index 0eb074ffe732cd432adece4c23d4c52e09a822db..0115b70fa2a20fb50c77914c51a4fb4bfc2b07a6 100644 (file)
@@ -30,7 +30,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.4 2010/01/27 16:41:09 heikki Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.5 2010/02/03 09:47:19 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -659,6 +659,7 @@ XLogSend(StringInfo outMsg)
                 * have the same byte order. If they have different byte order, we
                 * don't reach here.
                 */
+               pq_sendbyte(outMsg, 'w');
                pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
 
                if (endptr.xlogid != startptr.xlogid)
index 083eb4f07fb3f51ea930ed95b3ba1f7948024761..bf7ad41b068da3e0945eb65bf7e255f7e751e306 100644 (file)
@@ -5,7 +5,7 @@
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.6 2010/02/03 09:47:19 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -66,7 +66,8 @@ extern WalRcvData *WalRcv;
 typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
 extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
 
-typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
+typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
+                                                                        char **buffer, int *len);
 extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
 
 typedef void (*walrcv_disconnect_type) (void);