* loaded as a dynamic module to avoid linking the main server binary with
* libpq.
*
- * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(int timeout, char **buffer);
+static int libpqrcv_receive(char **buffer, pgsocket *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
static void
libpqrcv_connect(char *conninfo)
{
- const char *keys[5];
- const char *vals[5];
+ const char *keys[5];
+ const char *vals[5];
/*
- * We use the expand_dbname parameter to process the connection string
- * (or URI), and pass some extra options. The deliberately undocumented
- * parameter "replication=true" makes it a replication connection.
- * The database name is ignored by the server in replication mode, but
- * specify "replication" for .pgpass lookup.
+ * We use the expand_dbname parameter to process the connection string (or
+ * URI), and pass some extra options. The deliberately undocumented
+ * parameter "replication=true" makes it a replication connection. The
+ * database name is ignored by the server in replication mode, but specify
+ * "replication" for .pgpass lookup.
*/
keys[0] = "dbname";
vals[0] = conninfo;
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn))));
+ PQclear(res);
/* Verify that there are no more results */
res = PQgetResult(streamConn);
if (PQsocket(streamConn) < 0)
ereport(ERROR,
(errcode_for_socket_access(),
- errmsg("socket not open")));
+ errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
/* We use poll(2) if available, otherwise select(2) */
{
}
/*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
*
* Returns:
*
* point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal.
+ * If no data was available immediately, returns 0, and *wait_fd is set to a
+ * socket descriptor which can be waited on before trying again.
*
* -1 if the server ended the COPY.
*
* ereports on error.
*/
static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, pgsocket *wait_fd)
{
int rawlen;
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
- /*
- * No data available yet. If the caller requested to block, wait for
- * more data to arrive.
- */
- if (timeout > 0)
- {
- if (!libpq_select(timeout))
- return 0;
- }
-
+ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
+ {
+ /* Tell caller to try again when our socket is ready. */
+ *wait_fd = PQsocket(streamConn);
return 0;
+ }
}
if (rawlen == -1) /* end-of-streaming or error */
{