From: Fujii Masao <fujii@postgresql.org>
Date: Wed, 6 Aug 2014 11:58:13 +0000 (+0900)
Subject: Refactor pg_receivexlog main loop code, for readability, take 2.
X-Git-Tag: REL9_5_ALPHA1~1655
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=6805e02c66eac3857ef1a3d6cfd1ffeea64d9447;p=postgresql

Refactor pg_receivexlog main loop code, for readability, take 2.

Previously the source codes for processing the received data and handling
the end of stream were included in pg_receivexlog main loop. This commit
splits out them as separate functions. This is useful for improving the
readability of main loop code and making the future pg_receivexlog-related
patch simpler.
---

diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index a260881517..d28e13b4d8 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,12 +31,23 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static bool still_sending = true;		/* feedback still needs to be sent? */
+
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
 				 char *partial_suffix, XLogRecPtr *stoppos);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+								XLogRecPtr blockpos, int64 *last_status);
+static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+							   XLogRecPtr *blockpos, uint32 timeline,
+							   char *basedir, stream_stop_callback stream_stop,
+							   char *partial_suffix);
+static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+									   XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+									   XLogRecPtr *stoppos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -740,16 +751,13 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 	char	   *copybuf = NULL;
 	int64		last_status = -1;
 	XLogRecPtr	blockpos = startpos;
-	bool		still_sending = true;
+
+	still_sending = true;
 
 	while (1)
 	{
 		int			r;
-		int			xlogoff;
-		int			bytes_left;
-		int			bytes_written;
 		int64		now;
-		int			hdr_len;
 		long		sleeptime;
 
 		/*
@@ -818,198 +826,26 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			goto error;
 		if (r == -2)
 		{
-			PGresult   *res = PQgetResult(conn);
-
-			/*
-			 * The server closed its end of the copy stream.  If we haven't
-			 * closed ours already, we need to do so now, unless the server
-			 * threw an error, in which case we don't.
-			 */
-			if (still_sending)
-			{
-				if (!close_walfile(basedir, partial_suffix, blockpos))
-				{
-					/* Error message written in close_walfile() */
-					PQclear(res);
-					goto error;
-				}
-				if (PQresultStatus(res) == PGRES_COPY_IN)
-				{
-					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-					{
-						fprintf(stderr,
-								_("%s: could not send copy-end packet: %s"),
-								progname, PQerrorMessage(conn));
-						PQclear(res);
-						goto error;
-					}
-					PQclear(res);
-					res = PQgetResult(conn);
-				}
-				still_sending = false;
-			}
-			if (copybuf != NULL)
-				PQfreemem(copybuf);
-			copybuf = NULL;
-			*stoppos = blockpos;
-			return res;
+			PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+													 basedir, partial_suffix, stoppos);
+			if (res == NULL)
+				goto error;
+			else
+				return res;
 		}
 
 		/* Check the message type. */
 		if (copybuf[0] == 'k')
 		{
-			int			pos;
-			bool		replyRequested;
-
-			/*
-			 * Parse the keepalive message, enclosed in the CopyData message.
-			 * We just check if the server requested a reply, and ignore the
-			 * rest.
-			 */
-			pos = 1;			/* skip msgtype 'k' */
-			pos += 8;			/* skip walEnd */
-			pos += 8;			/* skip sendTime */
-
-			if (r < pos + 1)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
+			if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+									 &last_status))
 				goto error;
-			}
-			replyRequested = copybuf[pos];
-
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested && still_sending)
-			{
-				now = feGetCurrentTimestamp();
-				if (!sendFeedback(conn, blockpos, now, false))
-					goto error;
-				last_status = now;
-			}
 		}
 		else if (copybuf[0] == 'w')
 		{
-			/*
-			 * Once we've decided we don't want to receive any more, just
-			 * ignore any subsequent XLogData messages.
-			 */
-			if (!still_sending)
-				continue;
-
-			/*
-			 * Read the header of the XLogData message, enclosed in the
-			 * CopyData message. We only need the WAL location field
-			 * (dataStart), the rest of the header is ignored.
-			 */
-			hdr_len = 1;		/* msgtype 'w' */
-			hdr_len += 8;		/* dataStart */
-			hdr_len += 8;		/* walEnd */
-			hdr_len += 8;		/* sendTime */
-			if (r < hdr_len)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
+			if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+									timeline, basedir, stream_stop, partial_suffix))
 				goto error;
-			}
-			blockpos = fe_recvint64(&copybuf[1]);
-
-			/* Extract WAL location for this block */
-			xlogoff = blockpos % XLOG_SEG_SIZE;
-
-			/*
-			 * Verify that the initial location in the stream matches where we
-			 * think we are.
-			 */
-			if (walfile == -1)
-			{
-				/* No file open yet */
-				if (xlogoff != 0)
-				{
-					fprintf(stderr,
-							_("%s: received transaction log record for offset %u with no file open\n"),
-							progname, xlogoff);
-					goto error;
-				}
-			}
-			else
-			{
-				/* More data in existing segment */
-				/* XXX: store seek value don't reseek all the time */
-				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
-				{
-					fprintf(stderr,
-						  _("%s: got WAL data offset %08x, expected %08x\n"),
-					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
-					goto error;
-				}
-			}
-
-			bytes_left = r - hdr_len;
-			bytes_written = 0;
-
-			while (bytes_left)
-			{
-				int			bytes_to_write;
-
-				/*
-				 * If crossing a WAL boundary, only write up until we reach
-				 * XLOG_SEG_SIZE.
-				 */
-				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
-					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
-				else
-					bytes_to_write = bytes_left;
-
-				if (walfile == -1)
-				{
-					if (!open_walfile(blockpos, timeline,
-									  basedir, partial_suffix))
-					{
-						/* Error logged by open_walfile */
-						goto error;
-					}
-				}
-
-				if (write(walfile,
-						  copybuf + hdr_len + bytes_written,
-						  bytes_to_write) != bytes_to_write)
-				{
-					fprintf(stderr,
-							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
-							progname, bytes_to_write, current_walfile_name,
-							strerror(errno));
-					goto error;
-				}
-
-				/* Write was successful, advance our position */
-				bytes_written += bytes_to_write;
-				bytes_left -= bytes_to_write;
-				blockpos += bytes_to_write;
-				xlogoff += bytes_to_write;
-
-				/* Did we reach the end of a WAL segment? */
-				if (blockpos % XLOG_SEG_SIZE == 0)
-				{
-					if (!close_walfile(basedir, partial_suffix, blockpos))
-						/* Error message written in close_walfile() */
-						goto error;
-
-					xlogoff = 0;
-
-					if (still_sending && stream_stop(blockpos, timeline, true))
-					{
-						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-						{
-							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-									progname, PQerrorMessage(conn));
-							goto error;
-						}
-						still_sending = false;
-						break;	/* ignore the rest of this XLogData packet */
-					}
-				}
-			}
-			/* No more data left to write, receive next copy packet */
 		}
 		else
 		{
@@ -1135,3 +971,225 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
 	*buffer = copybuf;
 	return rawlen;
 }
+
+/*
+ * Process the keepalive message.
+ */
+static bool
+ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+					XLogRecPtr blockpos, int64 *last_status)
+{
+	int			pos;
+	bool		replyRequested;
+	int64		now;
+
+	/*
+	 * Parse the keepalive message, enclosed in the CopyData message.
+	 * We just check if the server requested a reply, and ignore the
+	 * rest.
+	 */
+	pos = 1;			/* skip msgtype 'k' */
+	pos += 8;			/* skip walEnd */
+	pos += 8;			/* skip sendTime */
+
+	if (len < pos + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, len);
+		return false;
+	}
+	replyRequested = copybuf[pos];
+
+	/* If the server requested an immediate reply, send one. */
+	if (replyRequested && still_sending)
+	{
+		now = feGetCurrentTimestamp();
+		if (!sendFeedback(conn, blockpos, now, false))
+			return false;
+		*last_status = now;
+	}
+
+	return true;
+}
+
+/*
+ * Process XLogData message.
+ */
+static bool
+ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+				   XLogRecPtr *blockpos, uint32 timeline,
+				   char *basedir, stream_stop_callback stream_stop,
+				   char *partial_suffix)
+{
+	int			xlogoff;
+	int			bytes_left;
+	int			bytes_written;
+	int			hdr_len;
+
+	/*
+	 * Once we've decided we don't want to receive any more, just
+	 * ignore any subsequent XLogData messages.
+	 */
+	if (!(still_sending))
+		return true;
+
+	/*
+	 * Read the header of the XLogData message, enclosed in the
+	 * CopyData message. We only need the WAL location field
+	 * (dataStart), the rest of the header is ignored.
+	 */
+	hdr_len = 1;		/* msgtype 'w' */
+	hdr_len += 8;		/* dataStart */
+	hdr_len += 8;		/* walEnd */
+	hdr_len += 8;		/* sendTime */
+	if (len < hdr_len)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, len);
+		return false;
+	}
+	*blockpos = fe_recvint64(&copybuf[1]);
+
+	/* Extract WAL location for this block */
+	xlogoff = *blockpos % XLOG_SEG_SIZE;
+
+	/*
+	 * Verify that the initial location in the stream matches where we
+	 * think we are.
+	 */
+	if (walfile == -1)
+	{
+		/* No file open yet */
+		if (xlogoff != 0)
+		{
+			fprintf(stderr,
+					_("%s: received transaction log record for offset %u with no file open\n"),
+					progname, xlogoff);
+			return false;
+		}
+	}
+	else
+	{
+		/* More data in existing segment */
+		/* XXX: store seek value don't reseek all the time */
+		if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+		{
+			fprintf(stderr,
+					_("%s: got WAL data offset %08x, expected %08x\n"),
+					progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+			return false;
+		}
+	}
+
+	bytes_left = len - hdr_len;
+	bytes_written = 0;
+
+	while (bytes_left)
+	{
+		int			bytes_to_write;
+
+		/*
+		 * If crossing a WAL boundary, only write up until we reach
+		 * XLOG_SEG_SIZE.
+		 */
+		if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+			bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+		else
+			bytes_to_write = bytes_left;
+
+		if (walfile == -1)
+		{
+			if (!open_walfile(*blockpos, timeline,
+							  basedir, partial_suffix))
+			{
+				/* Error logged by open_walfile */
+				return false;
+			}
+		}
+
+		if (write(walfile,
+				  copybuf + hdr_len + bytes_written,
+				  bytes_to_write) != bytes_to_write)
+		{
+			fprintf(stderr,
+					_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+					progname, bytes_to_write, current_walfile_name,
+					strerror(errno));
+			return false;
+		}
+
+		/* Write was successful, advance our position */
+		bytes_written += bytes_to_write;
+		bytes_left -= bytes_to_write;
+		*blockpos += bytes_to_write;
+		xlogoff += bytes_to_write;
+
+		/* Did we reach the end of a WAL segment? */
+		if (*blockpos % XLOG_SEG_SIZE == 0)
+		{
+			if (!close_walfile(basedir, partial_suffix, *blockpos))
+				/* Error message written in close_walfile() */
+				return false;
+
+			xlogoff = 0;
+
+			if (still_sending && stream_stop(*blockpos, timeline, true))
+			{
+				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+				{
+					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+							progname, PQerrorMessage(conn));
+					return false;
+				}
+				still_sending = false;
+				return true;	/* ignore the rest of this XLogData packet */
+			}
+		}
+	}
+	/* No more data left to write, receive next copy packet */
+
+	return true;
+}
+
+/*
+ * Handle end of the copy stream.
+ */
+static PGresult *
+HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+					  XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+					  XLogRecPtr *stoppos)
+{
+	PGresult   *res = PQgetResult(conn);
+
+	/*
+	 * The server closed its end of the copy stream.  If we haven't
+	 * closed ours already, we need to do so now, unless the server
+	 * threw an error, in which case we don't.
+	 */
+	if (still_sending)
+	{
+		if (!close_walfile(basedir, partial_suffix, blockpos))
+		{
+			/* Error message written in close_walfile() */
+			PQclear(res);
+			return NULL;
+		}
+		if (PQresultStatus(res) == PGRES_COPY_IN)
+		{
+			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			{
+				fprintf(stderr,
+						_("%s: could not send copy-end packet: %s"),
+						progname, PQerrorMessage(conn));
+				PQclear(res);
+				return NULL;
+			}
+			res = PQgetResult(conn);
+		}
+		still_sending = false;
+	}
+	if (copybuf != NULL)
+		PQfreemem(copybuf);
+	*stoppos = blockpos;
+	return res;
+}