static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
XLogRecPtr *stoppos);
-static int CopyStreamPoll(PGconn *conn, long timeout_ms);
-static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
+static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+ char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
int len, XLogRecPtr blockpos, TimestampTz *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* return. As long as it returns false, streaming will continue
* indefinitely.
*
+ * If stream_stop() checks for external input, stop_socket should be set to
+ * the FD it checks. This will allow such input to be detected promptly
+ * rather than after standby_message_timeout (which might be indefinite).
+ * Note that signals will interrupt waits for input as well, but that is
+ * race-y since a signal received while busy won't interrupt the wait.
+ *
* standby_message_timeout controls how often we send a message
* back to the master letting it know our progress, in milliseconds.
+ * Zero means no messages are sent.
* This message will only contain the write location, and never
* flush or replay.
*
sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status);
- r = CopyStreamReceive(conn, sleeptime, ©buf);
+ r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
while (r != 0)
{
if (r == -1)
* Process the received data, and any subsequent data we can read
* without blocking.
*/
- r = CopyStreamReceive(conn, 0, ©buf);
+ r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
}
}
}
/*
- * Wait until we can read CopyData message, or timeout.
+ * Wait until we can read a CopyData message,
+ * or timeout, or occurrence of a signal or input on the stop_socket.
+ * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
*
* Returns 1 if data has become available for reading, 0 if timed out
- * or interrupted by signal, and -1 on an error.
+ * or interrupted by signal or stop_socket input, and -1 on an error.
*/
static int
-CopyStreamPoll(PGconn *conn, long timeout_ms)
+CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
{
int ret;
fd_set input_mask;
+ int connsocket;
+ int maxfd;
struct timeval timeout;
struct timeval *timeoutptr;
- if (PQsocket(conn) < 0)
+ connsocket = PQsocket(conn);
+ if (connsocket < 0)
{
fprintf(stderr, _("%s: invalid socket: %s"), progname,
PQerrorMessage(conn));
}
FD_ZERO(&input_mask);
- FD_SET(PQsocket(conn), &input_mask);
+ FD_SET(connsocket, &input_mask);
+ maxfd = connsocket;
+ if (stop_socket != PGINVALID_SOCKET)
+ {
+ FD_SET(stop_socket, &input_mask);
+ maxfd = Max(maxfd, stop_socket);
+ }
if (timeout_ms < 0)
timeoutptr = NULL;
timeoutptr = &timeout;
}
- ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
- if (ret == 0 || (ret < 0 && errno == EINTR))
- return 0; /* Got a timeout or signal */
- else if (ret < 0)
+ ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
+
+ if (ret < 0)
{
+ if (errno == EINTR)
+ return 0; /* Got a signal, so not an error */
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
return -1;
}
+ if (ret > 0 && FD_ISSET(connsocket, &input_mask))
+ return 1; /* Got input on connection socket */
- return 1;
+ return 0; /* Got timeout or input on stop_socket */
}
/*
* point to a buffer holding the received message. The buffer is only valid
* until the next CopyStreamReceive call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal. -1 on error. -2 if the server ended the COPY.
+ * Returns 0 if no data was available within timeout, or if wait was
+ * interrupted by signal or stop_socket input.
+ * -1 on error. -2 if the server ended the COPY.
*/
static int
-CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+ char **buffer)
{
char *copybuf = NULL;
int rawlen;
rawlen = PQgetCopyData(conn, ©buf, 1);
if (rawlen == 0)
{
+ int ret;
+
/*
- * No data available. Wait for some to appear, but not longer than the
- * specified timeout, so that we can ping the server.
+ * No data available. Wait for some to appear, but not longer than
+ * the specified timeout, so that we can ping the server. Also stop
+ * waiting if input appears on stop_socket.
*/
- if (timeout != 0)
- {
- int ret;
-
- ret = CopyStreamPoll(conn, timeout);
- if (ret <= 0)
- return ret;
- }
+ ret = CopyStreamPoll(conn, timeout, stop_socket);
+ if (ret <= 0)
+ return ret;
- /* Else there is actually data on the socket */
+ /* Now there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,