]> granicus.if.org Git - postgresql/blobdiff - src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Adjust signature of walrcv_receive hook.
[postgresql] / src / backend / replication / libpqwalreceiver / libpqwalreceiver.c
index ecec8b345634960d591289ba90b220aad0f5e5da..b61e39d7d8a70263f90505769f8bc0701b3cfea3 100644 (file)
@@ -6,7 +6,7 @@
  * loaded as a dynamic module to avoid linking the main server binary with
  * libpq.
  *
- * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
@@ -50,9 +50,9 @@ static void libpqrcv_connect(char *conninfo);
 static void libpqrcv_identify_system(TimeLineID *primary_tli);
 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
-                                                                       char *slotname);
+                                               char *slotname);
 static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int     libpqrcv_receive(int timeout, char **buffer);
+static int     libpqrcv_receive(char **buffer, pgsocket *wait_fd);
 static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
@@ -89,18 +89,28 @@ _PG_init(void)
 static void
 libpqrcv_connect(char *conninfo)
 {
-       char            conninfo_repl[MAXCONNINFO + 75];
+       const char *keys[5];
+       const char *vals[5];
 
        /*
-        * Connect using deliberately undocumented parameter: replication. The
+        * We use the expand_dbname parameter to process the connection string (or
+        * URI), and pass some extra options. The deliberately undocumented
+        * parameter "replication=true" makes it a replication connection. The
         * database name is ignored by the server in replication mode, but specify
         * "replication" for .pgpass lookup.
         */
-       snprintf(conninfo_repl, sizeof(conninfo_repl),
-                        "%s dbname=replication replication=true fallback_application_name=walreceiver",
-                        conninfo);
-
-       streamConn = PQconnectdb(conninfo_repl);
+       keys[0] = "dbname";
+       vals[0] = conninfo;
+       keys[1] = "replication";
+       vals[1] = "true";
+       keys[2] = "dbname";
+       vals[2] = "replication";
+       keys[3] = "fallback_application_name";
+       vals[3] = "walreceiver";
+       keys[4] = NULL;
+       vals[4] = NULL;
+
+       streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
        if (PQstatus(streamConn) != CONNECTION_OK)
                ereport(ERROR,
                                (errmsg("could not connect to the primary server: %s",
@@ -131,7 +141,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
                                                "the primary server: %s",
                                                PQerrorMessage(streamConn))));
        }
-       if (PQnfields(res) != 3 || PQntuples(res) != 1)
+       if (PQnfields(res) < 3 || PQntuples(res) != 1)
        {
                int                     ntuples = PQntuples(res);
                int                     nfields = PQnfields(res);
@@ -139,8 +149,8 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
                PQclear(res);
                ereport(ERROR,
                                (errmsg("invalid response from primary server"),
-                                errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
-                                                  ntuples, nfields)));
+                                errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
+                                                  ntuples, nfields, 3, 1)));
        }
        primary_sysid = PQgetvalue(res, 0, 0);
        *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
@@ -152,6 +162,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
                         GetSystemIdentifier());
        if (strcmp(primary_sysid, standby_sysid) != 0)
        {
+               primary_sysid = pstrdup(primary_sysid);
                PQclear(res);
                ereport(ERROR,
                                (errmsg("database system identifier differs between the primary and standby"),
@@ -174,7 +185,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 static bool
 libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
 {
-       char            cmd[64];
+       char            cmd[256];
        PGresult   *res;
 
        /* Start streaming from the point requested by startup process */
@@ -251,6 +262,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
                ereport(ERROR,
                                (errmsg("error reading result of streaming command: %s",
                                                PQerrorMessage(streamConn))));
+       PQclear(res);
 
        /* Verify that there are no more results */
        res = PQgetResult(streamConn);
@@ -319,7 +331,7 @@ libpq_select(int timeout_ms)
        if (PQsocket(streamConn) < 0)
                ereport(ERROR,
                                (errcode_for_socket_access(),
-                                errmsg("socket not open")));
+                                errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
 
        /* We use poll(2) if available, otherwise select(2) */
        {
@@ -451,8 +463,7 @@ libpqrcv_disconnect(void)
 }
 
 /*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
  *
  * Returns:
  *
@@ -460,15 +471,15 @@ libpqrcv_disconnect(void)
  *      point to a buffer holding the received message. The buffer is only valid
  *      until the next libpqrcv_* call.
  *
- *      0 if no data was available within timeout, or wait was interrupted
- *      by signal.
+ *      If no data was available immediately, returns 0, and *wait_fd is set to a
+ *      socket descriptor which can be waited on before trying again.
  *
  *      -1 if the server ended the COPY.
  *
  * ereports on error.
  */
 static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, pgsocket *wait_fd)
 {
        int                     rawlen;
 
@@ -480,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
        rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
        if (rawlen == 0)
        {
-               /*
-                * No data available yet. If the caller requested to block, wait for
-                * more data to arrive.
-                */
-               if (timeout > 0)
-               {
-                       if (!libpq_select(timeout))
-                               return 0;
-               }
-
+               /* Try consuming some data. */
                if (PQconsumeInput(streamConn) == 0)
                        ereport(ERROR,
                                        (errmsg("could not receive data from WAL stream: %s",
@@ -498,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
                /* Now that we've consumed some input, try again */
                rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
                if (rawlen == 0)
+               {
+                       /* Tell caller to try again when our socket is ready. */
+                       *wait_fd = PQsocket(streamConn);
                        return 0;
+               }
        }
        if (rawlen == -1)                       /* end-of-streaming or error */
        {