]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Adjust signature of walrcv_receive hook.
[postgresql] / src / backend / replication / libpqwalreceiver / libpqwalreceiver.c
index 96e73fb6a4d1b243b4fcfdc4936759412ef7fd95..b61e39d7d8a70263f90505769f8bc0701b3cfea3 100644 (file)
@@ -6,7 +6,7 @@
  * loaded as a dynamic module to avoid linking the main server binary with
  * libpq.
  *
- * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
                                                char *slotname);
 static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int     libpqrcv_receive(int timeout, char **buffer);
+static int     libpqrcv_receive(char **buffer, pgsocket *wait_fd);
 static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
@@ -89,15 +89,15 @@ _PG_init(void)
 static void
 libpqrcv_connect(char *conninfo)
 {
-       const char      *keys[5];
-       const char      *vals[5];
+       const char *keys[5];
+       const char *vals[5];
 
        /*
-        * We use the expand_dbname parameter to process the connection string
-        * (or URI), and pass some extra options. The deliberately undocumented
-        * parameter "replication=true" makes it a replication connection.
-        * The database name is ignored by the server in replication mode, but
-        * specify "replication" for .pgpass lookup.
+        * We use the expand_dbname parameter to process the connection string (or
+        * URI), and pass some extra options. The deliberately undocumented
+        * parameter "replication=true" makes it a replication connection. The
+        * database name is ignored by the server in replication mode, but specify
+        * "replication" for .pgpass lookup.
         */
        keys[0] = "dbname";
        vals[0] = conninfo;
@@ -262,6 +262,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
                ereport(ERROR,
                                (errmsg("error reading result of streaming command: %s",
                                                PQerrorMessage(streamConn))));
+       PQclear(res);
 
        /* Verify that there are no more results */
        res = PQgetResult(streamConn);
@@ -330,7 +331,7 @@ libpq_select(int timeout_ms)
        if (PQsocket(streamConn) < 0)
                ereport(ERROR,
                                (errcode_for_socket_access(),
-                                errmsg("socket not open")));
+                                errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
 
        /* We use poll(2) if available, otherwise select(2) */
        {
@@ -462,8 +463,7 @@ libpqrcv_disconnect(void)
 }
 
 /*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
  *
  * Returns:
  *
@@ -471,15 +471,15 @@ libpqrcv_disconnect(void)
  *      point to a buffer holding the received message. The buffer is only valid
  *      until the next libpqrcv_* call.
  *
- *      0 if no data was available within timeout, or wait was interrupted
- *      by signal.
+ *      If no data was available immediately, returns 0, and *wait_fd is set to a
+ *      socket descriptor which can be waited on before trying again.
  *
  *      -1 if the server ended the COPY.
  *
  * ereports on error.
  */
 static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, pgsocket *wait_fd)
 {
        int                     rawlen;
 
@@ -491,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
        rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
        if (rawlen == 0)
        {
-               /*
-                * No data available yet. If the caller requested to block, wait for
-                * more data to arrive.
-                */
-               if (timeout > 0)
-               {
-                       if (!libpq_select(timeout))
-                               return 0;
-               }
-
+               /* Try consuming some data. */
                if (PQconsumeInput(streamConn) == 0)
                        ereport(ERROR,
                                        (errmsg("could not receive data from WAL stream: %s",
@@ -509,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
                /* Now that we've consumed some input, try again */
                rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
                if (rawlen == 0)
+               {
+                       /* Tell caller to try again when our socket is ready. */
+                       *wait_fd = PQsocket(streamConn);
                        return 0;
+               }
        }
        if (rawlen == -1)                       /* end-of-streaming or error */
        {