uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
int bytes_written;
int64 now;
int hdr_len;
-
- if (copybuf != NULL)
- {
- PQfreemem(copybuf);
- copybuf = NULL;
- }
+ long sleeptime;
/*
* Check if we should continue streaming, or abort at this point.
last_status = now;
}
- r = PQgetCopyData(conn, ©buf, 1);
- if (r == 0)
+ /*
+ * Compute how long send/receive loops should sleep
+ */
+ if (standby_message_timeout && still_sending)
{
- /*
- * No data available. Wait for some to appear, but not longer than
- * the specified timeout, so that we can ping the server.
- */
- fd_set input_mask;
- struct timeval timeout;
- struct timeval *timeoutptr;
-
- FD_ZERO(&input_mask);
- FD_SET(PQsocket(conn), &input_mask);
- if (standby_message_timeout && still_sending)
+ int64 targettime;
+ long secs;
+ int usecs;
+
+ targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+ feTimestampDifference(now,
+ targettime,
+ &secs,
+ &usecs);
+ /* Always sleep at least 1 sec */
+ if (secs <= 0)
{
- int64 targettime;
- long secs;
- int usecs;
-
- targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
- feTimestampDifference(now,
- targettime,
- &secs,
- &usecs);
- if (secs <= 0)
- timeout.tv_sec = 1; /* Always sleep at least 1 sec */
- else
- timeout.tv_sec = secs;
- timeout.tv_usec = usecs;
- timeoutptr = &timeout;
+ secs = 1;
+ usecs = 0;
}
- else
- timeoutptr = NULL;
- r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
- if (r == 0 || (r < 0 && errno == EINTR))
- {
- /*
- * Got a timeout or signal. Continue the loop and either
- * deliver a status packet to the server or just go back into
- * blocking.
- */
- continue;
- }
- else if (r < 0)
- {
- fprintf(stderr, _("%s: select() failed: %s\n"),
- progname, strerror(errno));
- goto error;
- }
- /* Else there is actually data on the socket */
- if (PQconsumeInput(conn) == 0)
- {
- fprintf(stderr,
- _("%s: could not receive data from WAL stream: %s"),
- progname, PQerrorMessage(conn));
- goto error;
- }
- continue;
+ sleeptime = secs * 1000 + usecs / 1000;
}
+ else
+ sleeptime = -1;
+
+ r = CopyStreamReceive(conn, sleeptime, ©buf);
+ if (r == 0)
+ continue;
if (r == -1)
+ goto error;
+ if (r == -2)
{
PGresult *res = PQgetResult(conn);
}
if (copybuf != NULL)
PQfreemem(copybuf);
+ copybuf = NULL;
*stoppos = blockpos;
return res;
}
- if (r == -2)
- {
- fprintf(stderr, _("%s: could not read COPY data: %s"),
- progname, PQerrorMessage(conn));
- goto error;
- }
/* Check the message type. */
if (copybuf[0] == 'k')
PQfreemem(copybuf);
return NULL;
}
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+ int ret;
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *timeoutptr;
+
+ if (PQsocket(conn) < 0)
+ {
+ fprintf(stderr, _("%s: socket not open"), progname);
+ return -1;
+ }
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+
+ if (timeout_ms < 0)
+ timeoutptr = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000L;
+ timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+ timeoutptr = &timeout;
+ }
+
+ ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ if (ret == 0 || (ret < 0 && errno == EINTR))
+ return 0; /* Got a timeout or signal */
+ else if (ret < 0)
+ {
+ fprintf(stderr, _("%s: select() failed: %s\n"),
+ progname, strerror(errno));
+ return -1;
+ }
+
+ return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+ static char *copybuf = NULL;
+ int rawlen;
+
+ if (copybuf != NULL)
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ *buffer = NULL;
+
+ /* Try to receive a CopyData message */
+ rawlen = PQgetCopyData(conn, ©buf, 1);
+ if (rawlen == 0)
+ {
+ /*
+ * No data available. Wait for some to appear, but not longer than
+ * the specified timeout, so that we can ping the server.
+ */
+ if (timeout > 0)
+ {
+ int ret;
+
+ ret = CopyStreamPoll(conn, timeout);
+ if (ret <= 0)
+ return ret;
+ }
+
+ /* Else there is actually data on the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ fprintf(stderr,
+ _("%s: could not receive data from WAL stream: %s"),
+ progname, PQerrorMessage(conn));
+ return -1;
+ }
+
+ /* Now that we've consumed some input, try again */
+ rawlen = PQgetCopyData(conn, ©buf, 1);
+ if (rawlen == 0)
+ return 0;
+ }
+ if (rawlen == -1) /* end-of-streaming or error */
+ return -2;
+ if (rawlen == -2)
+ {
+ fprintf(stderr, _("%s: could not read COPY data: %s"),
+ progname, PQerrorMessage(conn));
+ return -1;
+ }
+
+ /* Return received messages to caller */
+ *buffer = copybuf;
+ return rawlen;
+}