goes into copy-in mode, and the server may not send any more CopyData
messages. After both sides have sent a CopyDone message, the copy mode
is terminated, and the backend reverts to the command-processing mode.
- See <xref linkend="protocol-replication"> for more information on the
- subprotocol transmitted over copy-both mode.
+ In the event of a backend-detected error during copy-both mode,
+ the backend will issue an ErrorResponse message, discard frontend messages
+ until a Sync message is received, and then issue ReadyForQuery and return
+ to normal processing. The frontend should treat receipt of ErrorResponse
+ as terminating the copy in both directions; no CopyDone should be sent
+ in this case. See <xref linkend="protocol-replication"> for more
+ information on the subprotocol transmitted over copy-both mode.
</para>
<para>
static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
-static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
- char *basedir, stream_stop_callback stream_stop,
- int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos);
+static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
+ uint32 timeline, char *basedir,
+ stream_stop_callback stream_stop, int standby_message_timeout,
+ char *partial_suffix, XLogRecPtr *stoppos);
/*
* Open a new WAL file in the specified directory.
PQclear(res);
/* Stream the WAL */
- if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
- standby_message_timeout, partial_suffix,
- &stoppos))
+ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
+ standby_message_timeout, partial_suffix,
+ &stoppos);
+ if (res == NULL)
goto error;
/*
* restart streaming from the next timeline.
*/
- res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
* The main loop of ReceiveXLogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*
- * If the COPY ends normally, returns true and sets *stoppos to the last
- * byte written. On error, returns false.
+ * If the COPY ends (not necessarily successfully) due a message from the
+ * server, returns a PGresult and sets sets *stoppos to the last byte written.
+ * On any other sort of error, returns NULL.
*/
-static bool
+static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
}
if (r == -1)
{
+ PGresult *res = PQgetResult(conn);
+
/*
- * The server closed its end of the copy stream. Close ours
- * if we haven't done so already, and exit.
+ * The server closed its end of the copy stream. If we haven't
+ * closed ours already, we need to do so now, unless the server
+ * threw an error, in which case we don't.
*/
if (still_sending)
{
/* Error message written in close_walfile() */
goto error;
}
- if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ if (PQresultStatus(res) == PGRES_COPY_IN)
{
- fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
- goto error;
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr,
+ _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ goto error;
+ }
+ res = PQgetResult(conn);
}
still_sending = false;
}
if (copybuf != NULL)
PQfreemem(copybuf);
*stoppos = blockpos;
- return true;
+ return res;
}
if (r == -2)
{
error:
if (copybuf != NULL)
PQfreemem(copybuf);
- return false;
+ return NULL;
}
break;
case 'd': /* Copy Data, pass it back to caller */
return msgLength;
+ case 'c':
+ /*
+ * If this is a CopyDone message, exit COPY_OUT mode and let
+ * caller read status with PQgetResult(). If we're in
+ * COPY_BOTH mode, return to COPY_IN mode.
+ */
+ if (conn->asyncStatus == PGASYNC_COPY_BOTH)
+ conn->asyncStatus = PGASYNC_COPY_IN;
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
+ return -1;
default: /* treat as end of copy */
+ /*
+ * Any other message terminates either COPY_IN or COPY_BOTH
+ * mode.
+ */
+ conn->asyncStatus = PGASYNC_BUSY;
return -1;
}
*/
msgLength = getCopyDataMessage(conn);
if (msgLength < 0)
- {
- /*
- * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
- * read status with PQgetResult(). The normal case is that it's
- * Copy Done, but we let parseInput read that. If error, we
- * expect the state was already changed.
- */
- if (msgLength == -1)
- {
- if (conn->asyncStatus == PGASYNC_COPY_BOTH)
- conn->asyncStatus = PGASYNC_COPY_IN;
- else
- conn->asyncStatus = PGASYNC_BUSY;
- }
return msgLength; /* end-of-copy or error */
- }
if (msgLength == 0)
{
/* Don't block if async read requested */