static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+static bool still_sending = true; /* feedback still needs to be sent? */
+
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ XLogRecPtr blockpos, int64 *last_status);
+static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+ XLogRecPtr *blockpos, uint32 timeline,
+ char *basedir, stream_stop_callback stream_stop,
+ char *partial_suffix);
+static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+ XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+ XLogRecPtr *stoppos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = startpos;
- bool still_sending = true;
+
+ still_sending = true;
while (1)
{
int r;
- int xlogoff;
- int bytes_left;
- int bytes_written;
int64 now;
- int hdr_len;
long sleeptime;
/*
goto error;
if (r == -2)
{
- PGresult *res = PQgetResult(conn);
-
- /*
- * The server closed its end of the copy stream. If we haven't
- * closed ours already, we need to do so now, unless the server
- * threw an error, in which case we don't.
- */
- if (still_sending)
- {
- if (!close_walfile(basedir, partial_suffix, blockpos))
- {
- /* Error message written in close_walfile() */
- PQclear(res);
- goto error;
- }
- if (PQresultStatus(res) == PGRES_COPY_IN)
- {
- if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
- {
- fprintf(stderr,
- _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
- PQclear(res);
- goto error;
- }
- PQclear(res);
- res = PQgetResult(conn);
- }
- still_sending = false;
- }
- if (copybuf != NULL)
- PQfreemem(copybuf);
- copybuf = NULL;
- *stoppos = blockpos;
- return res;
+ PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+ basedir, partial_suffix, stoppos);
+ if (res == NULL)
+ goto error;
+ else
+ return res;
}
/* Check the message type. */
if (copybuf[0] == 'k')
{
- int pos;
- bool replyRequested;
-
- /*
- * Parse the keepalive message, enclosed in the CopyData message.
- * We just check if the server requested a reply, and ignore the
- * rest.
- */
- pos = 1; /* skip msgtype 'k' */
- pos += 8; /* skip walEnd */
- pos += 8; /* skip sendTime */
-
- if (r < pos + 1)
- {
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, r);
+ if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+ &last_status))
goto error;
- }
- replyRequested = copybuf[pos];
-
- /* If the server requested an immediate reply, send one. */
- if (replyRequested && still_sending)
- {
- now = feGetCurrentTimestamp();
- if (!sendFeedback(conn, blockpos, now, false))
- goto error;
- last_status = now;
- }
}
else if (copybuf[0] == 'w')
{
- /*
- * Once we've decided we don't want to receive any more, just
- * ignore any subsequent XLogData messages.
- */
- if (!still_sending)
- continue;
-
- /*
- * Read the header of the XLogData message, enclosed in the
- * CopyData message. We only need the WAL location field
- * (dataStart), the rest of the header is ignored.
- */
- hdr_len = 1; /* msgtype 'w' */
- hdr_len += 8; /* dataStart */
- hdr_len += 8; /* walEnd */
- hdr_len += 8; /* sendTime */
- if (r < hdr_len)
- {
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, r);
+ if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+ timeline, basedir, stream_stop, partial_suffix))
goto error;
- }
- blockpos = fe_recvint64(©buf[1]);
-
- /* Extract WAL location for this block */
- xlogoff = blockpos % XLOG_SEG_SIZE;
-
- /*
- * Verify that the initial location in the stream matches where we
- * think we are.
- */
- if (walfile == -1)
- {
- /* No file open yet */
- if (xlogoff != 0)
- {
- fprintf(stderr,
- _("%s: received transaction log record for offset %u with no file open\n"),
- progname, xlogoff);
- goto error;
- }
- }
- else
- {
- /* More data in existing segment */
- /* XXX: store seek value don't reseek all the time */
- if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
- {
- fprintf(stderr,
- _("%s: got WAL data offset %08x, expected %08x\n"),
- progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
- goto error;
- }
- }
-
- bytes_left = r - hdr_len;
- bytes_written = 0;
-
- while (bytes_left)
- {
- int bytes_to_write;
-
- /*
- * If crossing a WAL boundary, only write up until we reach
- * XLOG_SEG_SIZE.
- */
- if (xlogoff + bytes_left > XLOG_SEG_SIZE)
- bytes_to_write = XLOG_SEG_SIZE - xlogoff;
- else
- bytes_to_write = bytes_left;
-
- if (walfile == -1)
- {
- if (!open_walfile(blockpos, timeline,
- basedir, partial_suffix))
- {
- /* Error logged by open_walfile */
- goto error;
- }
- }
-
- if (write(walfile,
- copybuf + hdr_len + bytes_written,
- bytes_to_write) != bytes_to_write)
- {
- fprintf(stderr,
- _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
- progname, bytes_to_write, current_walfile_name,
- strerror(errno));
- goto error;
- }
-
- /* Write was successful, advance our position */
- bytes_written += bytes_to_write;
- bytes_left -= bytes_to_write;
- blockpos += bytes_to_write;
- xlogoff += bytes_to_write;
-
- /* Did we reach the end of a WAL segment? */
- if (blockpos % XLOG_SEG_SIZE == 0)
- {
- if (!close_walfile(basedir, partial_suffix, blockpos))
- /* Error message written in close_walfile() */
- goto error;
-
- xlogoff = 0;
-
- if (still_sending && stream_stop(blockpos, timeline, true))
- {
- if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
- {
- fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
- goto error;
- }
- still_sending = false;
- break; /* ignore the rest of this XLogData packet */
- }
- }
- }
- /* No more data left to write, receive next copy packet */
}
else
{
*buffer = copybuf;
return rawlen;
}
+
+/*
+ * Process the keepalive message.
+ */
+static bool
+ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ XLogRecPtr blockpos, int64 *last_status)
+{
+ int pos;
+ bool replyRequested;
+ int64 now;
+
+ /*
+ * Parse the keepalive message, enclosed in the CopyData message.
+ * We just check if the server requested a reply, and ignore the
+ * rest.
+ */
+ pos = 1; /* skip msgtype 'k' */
+ pos += 8; /* skip walEnd */
+ pos += 8; /* skip sendTime */
+
+ if (len < pos + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %d\n"),
+ progname, len);
+ return false;
+ }
+ replyRequested = copybuf[pos];
+
+ /* If the server requested an immediate reply, send one. */
+ if (replyRequested && still_sending)
+ {
+ now = feGetCurrentTimestamp();
+ if (!sendFeedback(conn, blockpos, now, false))
+ return false;
+ *last_status = now;
+ }
+
+ return true;
+}
+
+/*
+ * Process XLogData message.
+ */
+static bool
+ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+ XLogRecPtr *blockpos, uint32 timeline,
+ char *basedir, stream_stop_callback stream_stop,
+ char *partial_suffix)
+{
+ int xlogoff;
+ int bytes_left;
+ int bytes_written;
+ int hdr_len;
+
+ /*
+ * Once we've decided we don't want to receive any more, just
+ * ignore any subsequent XLogData messages.
+ */
+ if (!(still_sending))
+ return true;
+
+ /*
+ * Read the header of the XLogData message, enclosed in the
+ * CopyData message. We only need the WAL location field
+ * (dataStart), the rest of the header is ignored.
+ */
+ hdr_len = 1; /* msgtype 'w' */
+ hdr_len += 8; /* dataStart */
+ hdr_len += 8; /* walEnd */
+ hdr_len += 8; /* sendTime */
+ if (len < hdr_len)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %d\n"),
+ progname, len);
+ return false;
+ }
+ *blockpos = fe_recvint64(©buf[1]);
+
+ /* Extract WAL location for this block */
+ xlogoff = *blockpos % XLOG_SEG_SIZE;
+
+ /*
+ * Verify that the initial location in the stream matches where we
+ * think we are.
+ */
+ if (walfile == -1)
+ {
+ /* No file open yet */
+ if (xlogoff != 0)
+ {
+ fprintf(stderr,
+ _("%s: received transaction log record for offset %u with no file open\n"),
+ progname, xlogoff);
+ return false;
+ }
+ }
+ else
+ {
+ /* More data in existing segment */
+ /* XXX: store seek value don't reseek all the time */
+ if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ {
+ fprintf(stderr,
+ _("%s: got WAL data offset %08x, expected %08x\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ return false;
+ }
+ }
+
+ bytes_left = len - hdr_len;
+ bytes_written = 0;
+
+ while (bytes_left)
+ {
+ int bytes_to_write;
+
+ /*
+ * If crossing a WAL boundary, only write up until we reach
+ * XLOG_SEG_SIZE.
+ */
+ if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ else
+ bytes_to_write = bytes_left;
+
+ if (walfile == -1)
+ {
+ if (!open_walfile(*blockpos, timeline,
+ basedir, partial_suffix))
+ {
+ /* Error logged by open_walfile */
+ return false;
+ }
+ }
+
+ if (write(walfile,
+ copybuf + hdr_len + bytes_written,
+ bytes_to_write) != bytes_to_write)
+ {
+ fprintf(stderr,
+ _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+ progname, bytes_to_write, current_walfile_name,
+ strerror(errno));
+ return false;
+ }
+
+ /* Write was successful, advance our position */
+ bytes_written += bytes_to_write;
+ bytes_left -= bytes_to_write;
+ *blockpos += bytes_to_write;
+ xlogoff += bytes_to_write;
+
+ /* Did we reach the end of a WAL segment? */
+ if (*blockpos % XLOG_SEG_SIZE == 0)
+ {
+ if (!close_walfile(basedir, partial_suffix, *blockpos))
+ /* Error message written in close_walfile() */
+ return false;
+
+ xlogoff = 0;
+
+ if (still_sending && stream_stop(*blockpos, timeline, true))
+ {
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ still_sending = false;
+ return true; /* ignore the rest of this XLogData packet */
+ }
+ }
+ }
+ /* No more data left to write, receive next copy packet */
+
+ return true;
+}
+
+/*
+ * Handle end of the copy stream.
+ */
+static PGresult *
+HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+ XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+ XLogRecPtr *stoppos)
+{
+ PGresult *res = PQgetResult(conn);
+
+ /*
+ * The server closed its end of the copy stream. If we haven't
+ * closed ours already, we need to do so now, unless the server
+ * threw an error, in which case we don't.
+ */
+ if (still_sending)
+ {
+ if (!close_walfile(basedir, partial_suffix, blockpos))
+ {
+ /* Error message written in close_walfile() */
+ PQclear(res);
+ return NULL;
+ }
+ if (PQresultStatus(res) == PGRES_COPY_IN)
+ {
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr,
+ _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ PQclear(res);
+ return NULL;
+ }
+ res = PQgetResult(conn);
+ }
+ still_sending = false;
+ }
+ if (copybuf != NULL)
+ PQfreemem(copybuf);
+ *stoppos = blockpos;
+ return res;
+}