]> granicus.if.org Git - postgresql/commitdiff
Allow bidirectional copy messages in streaming replication mode.
authorRobert Haas <rhaas@postgresql.org>
Sat, 11 Dec 2010 14:27:37 +0000 (09:27 -0500)
committerRobert Haas <rhaas@postgresql.org>
Sat, 11 Dec 2010 14:27:37 +0000 (09:27 -0500)
Fujii Masao.  Review by Alvaro Herrera, Tom Lane, and myself.

doc/src/sgml/libpq.sgml
doc/src/sgml/protocol.sgml
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walreceiver.h
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-fe.h
src/interfaces/libpq/libpq-int.h

index c253c7c61c99a2abe8543d1905922a91df744f97..c502439356673e0db3abece957c0e5e3de3b4696 100644 (file)
@@ -2194,6 +2194,16 @@ ExecStatusType PQresultStatus(const PGresult *res);
           </listitem>
          </varlistentry>
 
+         <varlistentry id="libpq-pgres-copy-both">
+          <term><literal>PGRES_COPY_BOTH</literal></term>
+          <listitem>
+           <para>
+            Copy In/Out (to and from server) data transfer started.  This is
+            currently used only for streaming replication.
+           </para>
+          </listitem>
+         </varlistentry>
+
          <varlistentry id="libpq-pgres-bad-response">
           <term><literal>PGRES_BAD_RESPONSE</literal></term>
           <listitem>
index 7b2482be5a454e4bdd07a98eb927314edb4b356f..e3d636d557f1de527b193032b8087534e5a74b59 100644 (file)
    </para>
 
    <para>
-    The CopyInResponse and CopyOutResponse messages include fields that
-    inform the frontend of the number of columns per row and the format
-    codes being used for each column.  (As of the present implementation,
-    all columns in a given <command>COPY</> operation will use the same
-    format, but the message design does not assume this.)
+    There is another Copy-related mode called Copy-both, which allows
+    high-speed bulk data transfer to <emphasis>and</> from the server.
+    Copy-both mode is initiated when a backend in walsender mode
+    executes a <command>START_REPLICATION</command> statement.  The
+    backend sends a CopyBothResponse message to the frontend.  Both
+    the backend and the frontend may then send CopyData messages
+    until the connection is terminated.  See see <xref
+    linkend="protocol-replication">.
    </para>
+
+   <para>
+    The CopyInResponse, CopyOutResponse and CopyBothResponse messages
+    include fields that inform the frontend of the number of columns
+    per row and the format codes being used for each column.  (As of
+    the present implementation, all columns in a given <command>COPY</>
+    operation will use the same format, but the message design does not
+    assume this.)
+   </para>
+
   </sect2>
 
   <sect2 id="protocol-async">
@@ -1344,7 +1357,7 @@ The commands accepted in walsender mode are:
       WAL position <replaceable>XXX</>/<replaceable>XXX</>.
       The server can reply with an error, e.g. if the requested section of WAL
       has already been recycled. On success, server responds with a
-      CopyOutResponse message, and then starts to stream WAL to the frontend.
+      CopyBothResponse message, and then starts to stream WAL to the frontend.
       WAL will continue to be streamed until the connection is broken;
       no further commands will be accepted.
      </para>
@@ -2694,6 +2707,79 @@ CopyOutResponse (B)
 </varlistentry>
 
 
+<varlistentry>
+<term>
+CopyBothResponse (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('W')
+</term>
+<listitem>
+<para>
+                Identifies the message as a Start Copy Both response.
+                This message is used only for Streaming Replication.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                0 indicates the overall <command>COPY</command> format
+                is textual (rows separated by newlines, columns
+                separated by separator characters, etc). 1 indicates
+                the overall copy format is binary (similar to DataRow
+                format). See <xref linkend="sql-copy"> for more information.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int16
+</term>
+<listitem>
+<para>
+                The number of columns in the data to be copied
+                (denoted <replaceable>N</> below).
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int16[<replaceable>N</>]
+</term>
+<listitem>
+<para>
+                The format codes to be used for each column.
+                Each must presently be zero (text) or one (binary).
+                All must be zero if the overall copy format is textual.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 <varlistentry>
 <term>
 DataRow (B)
index f66a6b46b981073ee3a2ca23b7860c6042578312..d1ab36755f4eecbdef86653c3c42054c8cc8a045 100644 (file)
@@ -50,6 +50,7 @@ static char *recvBuf = NULL;
 static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
 static bool libpqrcv_receive(int timeout, unsigned char *type,
                                 char **buffer, int *len);
+static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
@@ -64,10 +65,11 @@ _PG_init(void)
 {
        /* Tell walreceiver how to reach us */
        if (walrcv_connect != NULL || walrcv_receive != NULL ||
-               walrcv_disconnect != NULL)
+               walrcv_send != NULL || walrcv_disconnect != NULL)
                elog(ERROR, "libpqwalreceiver already loaded");
        walrcv_connect = libpqrcv_connect;
        walrcv_receive = libpqrcv_receive;
+       walrcv_send = libpqrcv_send;
        walrcv_disconnect = libpqrcv_disconnect;
 }
 
@@ -157,7 +159,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
        snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
                         startpoint.xlogid, startpoint.xrecoff);
        res = libpqrcv_PQexec(cmd);
-       if (PQresultStatus(res) != PGRES_COPY_OUT)
+       if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
                PQclear(res);
                ereport(ERROR,
@@ -303,6 +305,7 @@ libpqrcv_PQexec(const char *query)
 
                if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
                        PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+                       PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
                        PQstatus(streamConn) == CONNECTION_BAD)
                        break;
        }
@@ -398,3 +401,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
 
        return true;
 }
+
+/*
+ * Send a message to XLOG stream.
+ *
+ * ereports on error.
+ */
+static void
+libpqrcv_send(const char *buffer, int nbytes)
+{
+       if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+               PQflush(streamConn))
+               ereport(ERROR,
+                               (errmsg("could not send data to WAL stream: %s",
+                                               PQerrorMessage(streamConn))));
+}
index a49ff6c896bdc6b9f8125032092c60f97c48c3c7..fac3be340f913cbb9fdaf4afe06b2cddff9ec098 100644 (file)
@@ -57,6 +57,7 @@ bool          am_walreceiver;
 /* libpqreceiver hooks to these when loaded */
 walrcv_connect_type walrcv_connect = NULL;
 walrcv_receive_type walrcv_receive = NULL;
+walrcv_send_type walrcv_send = NULL;
 walrcv_disconnect_type walrcv_disconnect = NULL;
 
 #define NAPTIME_PER_CYCLE 100  /* max sleep time between cycles (100ms) */
@@ -247,7 +248,7 @@ WalReceiverMain(void)
        /* Load the libpq-specific functions */
        load_file("libpqwalreceiver", false);
        if (walrcv_connect == NULL || walrcv_receive == NULL ||
-               walrcv_disconnect == NULL)
+               walrcv_send == NULL || walrcv_disconnect == NULL)
                elog(ERROR, "libpqwalreceiver didn't initialize correctly");
 
        /*
index d2b9e5c5f9aaa939d8f569f3232e523818ec4131..c8d2433158eec59113912d559876f628b7e9e74b 100644 (file)
@@ -287,8 +287,8 @@ WalSndHandshake(void)
                                                                        (errcode(ERRCODE_CANNOT_CONNECT_NOW),
                                                                         errmsg("standby connections not allowed because wal_level=minimal")));
 
-                                               /* Send a CopyOutResponse message, and start streaming */
-                                               pq_beginmessage(&buf, 'H');
+                                               /* Send a CopyBothResponse message, and start streaming */
+                                               pq_beginmessage(&buf, 'W');
                                                pq_sendbyte(&buf, 0);
                                                pq_sendint(&buf, 0, 2);
                                                pq_endmessage(&buf);
index df7eadfa9caea7611ff0d64842a028ce02743603..485df782ff060384ba6ff4d7393939d38448f82c 100644 (file)
@@ -84,6 +84,9 @@ typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
                                                                                                 char **buffer, int *len);
 extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
 
+typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+extern PGDLLIMPORT walrcv_send_type walrcv_send;
+
 typedef void (*walrcv_disconnect_type) (void);
 extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
 
index 8f25f5eb27c84151419900ebaf3a3b4110b34ad5..9858faeaa64616e140d382417c7392ecaf0f6daa 100644 (file)
@@ -35,6 +35,7 @@ char     *const pgresStatus[] = {
        "PGRES_TUPLES_OK",
        "PGRES_COPY_OUT",
        "PGRES_COPY_IN",
+       "PGRES_COPY_BOTH",
        "PGRES_BAD_RESPONSE",
        "PGRES_NONFATAL_ERROR",
        "PGRES_FATAL_ERROR"
@@ -174,6 +175,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
                        case PGRES_TUPLES_OK:
                        case PGRES_COPY_OUT:
                        case PGRES_COPY_IN:
+                       case PGRES_COPY_BOTH:
                                /* non-error cases */
                                break;
                        default:
@@ -1591,6 +1593,12 @@ PQgetResult(PGconn *conn)
                        else
                                res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
                        break;
+               case PGASYNC_COPY_BOTH:
+                       if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH)
+                               res = pqPrepareAsyncResult(conn);
+                       else
+                               res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH);
+                       break;
                default:
                        printfPQExpBuffer(&conn->errorMessage,
                                                          libpq_gettext("unexpected asyncStatus: %d\n"),
@@ -1775,6 +1783,13 @@ PQexecStart(PGconn *conn)
                                return false;
                        }
                }
+               else if (resultStatus == PGRES_COPY_BOTH)
+               {
+                       /* We don't allow PQexec during COPY BOTH */
+                       printfPQExpBuffer(&conn->errorMessage,
+                        libpq_gettext("PQexec not allowed during COPY BOTH\n"));
+                       return false;                   
+               }
                /* check for loss of connection, too */
                if (conn->status == CONNECTION_BAD)
                        return false;
@@ -1798,7 +1813,7 @@ PQexecFinish(PGconn *conn)
         * than one --- but merge error messages if we get more than one error
         * result.
         *
-        * We have to stop if we see copy in/out, however. We will resume parsing
+        * We have to stop if we see copy in/out/both, however. We will resume parsing
         * after application performs the data transfer.
         *
         * Also stop if the connection is lost (else we'll loop infinitely).
@@ -1827,6 +1842,7 @@ PQexecFinish(PGconn *conn)
                lastResult = result;
                if (result->resultStatus == PGRES_COPY_IN ||
                        result->resultStatus == PGRES_COPY_OUT ||
+                       result->resultStatus == PGRES_COPY_BOTH ||
                        conn->status == CONNECTION_BAD)
                        break;
        }
@@ -2000,7 +2016,7 @@ PQnotifies(PGconn *conn)
 }
 
 /*
- * PQputCopyData - send some data to the backend during COPY IN
+ * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
  *
  * Returns 1 if successful, 0 if data could not be sent (only possible
  * in nonblock mode), or -1 if an error occurs.
@@ -2010,7 +2026,8 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
 {
        if (!conn)
                return -1;
-       if (conn->asyncStatus != PGASYNC_COPY_IN)
+       if (conn->asyncStatus != PGASYNC_COPY_IN &&
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));
@@ -2148,6 +2165,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 
 /*
  * PQgetCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH
  *
  * If successful, sets *buffer to point to a malloc'd row of data, and
  * returns row length (always > 0) as result.
@@ -2161,7 +2179,8 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
        *buffer = NULL;                         /* for all failure cases */
        if (!conn)
                return -2;
-       if (conn->asyncStatus != PGASYNC_COPY_OUT)
+       if (conn->asyncStatus != PGASYNC_COPY_OUT &&
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));
index 31eff831ee03e56b4f4936d9392ab5b0f851dc21..ccf13423294fb9ff3d57b532f4bb83d4ecfd5b4b 100644 (file)
@@ -541,6 +541,10 @@ pqParseInput2(PGconn *conn)
                                case 'H':               /* Start Copy Out */
                                        conn->asyncStatus = PGASYNC_COPY_OUT;
                                        break;
+                                       /*
+                                        * Don't need to process CopyBothResponse here because
+                                        * it never arrives from the server during protocol 2.0.
+                                        */
                                default:
                                        printfPQExpBuffer(&conn->errorMessage,
                                                                          libpq_gettext(
index cf9407de7ae76553ff22c43a89cf9ae32c16b911..c398304156c1742838741b231aaa41a0bf93d76e 100644 (file)
@@ -358,6 +358,12 @@ pqParseInput3(PGconn *conn)
                                        conn->asyncStatus = PGASYNC_COPY_OUT;
                                        conn->copy_already_done = 0;
                                        break;
+                               case 'W':               /* Start Copy Both */
+                                       if (getCopyStart(conn, PGRES_COPY_BOTH))
+                                               return;
+                                       conn->asyncStatus = PGASYNC_COPY_BOTH;
+                                       conn->copy_already_done = 0;
+                                       break;
                                case 'd':               /* Copy Data */
 
                                        /*
@@ -1196,7 +1202,8 @@ getNotify(PGconn *conn)
 }
 
 /*
- * getCopyStart - process CopyInResponse or CopyOutResponse message
+ * getCopyStart - process CopyInResponse, CopyOutResponse or
+ * CopyBothResponse message
  *
  * parseInput already read the message type and length.
  */
@@ -1367,6 +1374,7 @@ getCopyDataMessage(PGconn *conn)
 
 /*
  * PQgetCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH
  *
  * If successful, sets *buffer to point to a malloc'd row of data, and
  * returns row length (always > 0) as result.
@@ -1390,10 +1398,10 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
                if (msgLength < 0)
                {
                        /*
-                        * On end-of-copy, exit COPY_OUT mode and let caller read status
-                        * with PQgetResult().  The normal case is that it's Copy Done,
-                        * but we let parseInput read that.  If error, we expect the state
-                        * was already changed.
+                        * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
+                        * read status with PQgetResult().      The normal case is that it's
+                        * Copy Done, but we let parseInput read that.  If error, we expect
+                        * the state was already changed.
                         */
                        if (msgLength == -1)
                                conn->asyncStatus = PGASYNC_BUSY;
index d9e3067894d194f64d1a91d3ec9e32a2bd763dde..271afedb7b8557b1ce054ebf88288514384b8c39 100644 (file)
@@ -85,6 +85,7 @@ typedef enum
                                                                 * contains the result tuples */
        PGRES_COPY_OUT,                         /* Copy Out data transfer in progress */
        PGRES_COPY_IN,                          /* Copy In data transfer in progress */
+       PGRES_COPY_BOTH,                        /* Copy In/Out data transfer in progress */
        PGRES_BAD_RESPONSE,                     /* an unexpected response was recv'd from the
                                                                 * backend */
        PGRES_NONFATAL_ERROR,           /* notice or warning message */
index ce5f330f9ea47a69fc5cb611aa30ddf518b243f3..bac3d0b3bf5f8e79e0bc54ad6a7390d6f6b96dce 100644 (file)
@@ -218,7 +218,8 @@ typedef enum
        PGASYNC_BUSY,                           /* query in progress */
        PGASYNC_READY,                          /* result ready for PQgetResult */
        PGASYNC_COPY_IN,                        /* Copy In data transfer in progress */
-       PGASYNC_COPY_OUT                        /* Copy Out data transfer in progress */
+       PGASYNC_COPY_OUT,                       /* Copy Out data transfer in progress */
+       PGASYNC_COPY_BOTH                       /* Copy In/Out data transfer in progress */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */