* loaded as a dynamic module to avoid linking the main server binary with
* libpq.
*
- * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
static void libpqrcv_identify_system(TimeLineID *primary_tli);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
- char *slotname);
+ char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(int timeout, char **buffer);
+static int libpqrcv_receive(char **buffer, pgsocket *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
static void
libpqrcv_connect(char *conninfo)
{
- char conninfo_repl[MAXCONNINFO + 75];
+ const char *keys[5];
+ const char *vals[5];
/*
- * Connect using deliberately undocumented parameter: replication. The
+ * We use the expand_dbname parameter to process the connection string (or
+ * URI), and pass some extra options. The deliberately undocumented
+ * parameter "replication=true" makes it a replication connection. The
* database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup.
*/
- snprintf(conninfo_repl, sizeof(conninfo_repl),
- "%s dbname=replication replication=true fallback_application_name=walreceiver",
- conninfo);
-
- streamConn = PQconnectdb(conninfo_repl);
+ keys[0] = "dbname";
+ vals[0] = conninfo;
+ keys[1] = "replication";
+ vals[1] = "true";
+ keys[2] = "dbname";
+ vals[2] = "replication";
+ keys[3] = "fallback_application_name";
+ vals[3] = "walreceiver";
+ keys[4] = NULL;
+ vals[4] = NULL;
+
+ streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
if (PQstatus(streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
"the primary server: %s",
PQerrorMessage(streamConn))));
}
- if (PQnfields(res) != 3 || PQntuples(res) != 1)
+ if (PQnfields(res) < 3 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
- errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
- ntuples, nfields)));
+ errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
+ ntuples, nfields, 3, 1)));
}
primary_sysid = PQgetvalue(res, 0, 0);
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
GetSystemIdentifier());
if (strcmp(primary_sysid, standby_sysid) != 0)
{
+ primary_sysid = pstrdup(primary_sysid);
PQclear(res);
ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"),
static bool
libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
{
- char cmd[64];
+ char cmd[256];
PGresult *res;
/* Start streaming from the point requested by startup process */
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn))));
+ PQclear(res);
/* Verify that there are no more results */
res = PQgetResult(streamConn);
if (PQsocket(streamConn) < 0)
ereport(ERROR,
(errcode_for_socket_access(),
- errmsg("socket not open")));
+ errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
/* We use poll(2) if available, otherwise select(2) */
{
}
/*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
*
* Returns:
*
* point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal.
+ * If no data was available immediately, returns 0, and *wait_fd is set to a
+ * socket descriptor which can be waited on before trying again.
*
* -1 if the server ended the COPY.
*
* ereports on error.
*/
static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, pgsocket *wait_fd)
{
int rawlen;
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
- /*
- * No data available yet. If the caller requested to block, wait for
- * more data to arrive.
- */
- if (timeout > 0)
- {
- if (!libpq_select(timeout))
- return 0;
- }
-
+ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
+ {
+ /* Tell caller to try again when our socket is ready. */
+ *wait_fd = PQsocket(streamConn);
return 0;
+ }
}
if (rawlen == -1) /* end-of-streaming or error */
{