]> granicus.if.org Git - postgresql/commitdiff
Refactor pg_receivexlog main loop code, for readability, take 2.
authorFujii Masao <fujii@postgresql.org>
Wed, 6 Aug 2014 11:58:13 +0000 (20:58 +0900)
committerFujii Masao <fujii@postgresql.org>
Wed, 6 Aug 2014 11:58:13 +0000 (20:58 +0900)
Previously the source codes for processing the received data and handling
the end of stream were included in pg_receivexlog main loop. This commit
splits out them as separate functions. This is useful for improving the
readability of main loop code and making the future pg_receivexlog-related
patch simpler.

src/bin/pg_basebackup/receivelog.c

index a260881517de995190e3fccedd8d5d7f322d2770..d28e13b4d8c9f3b9433aee886e2c8596a12ffa29 100644 (file)
@@ -31,12 +31,23 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static bool still_sending = true;              /* feedback still needs to be sent? */
+
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
                           stream_stop_callback stream_stop, int standby_message_timeout,
                                 char *partial_suffix, XLogRecPtr *stoppos);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+                                                               XLogRecPtr blockpos, int64 *last_status);
+static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+                                                          XLogRecPtr *blockpos, uint32 timeline,
+                                                          char *basedir, stream_stop_callback stream_stop,
+                                                          char *partial_suffix);
+static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+                                                                          XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+                                                                          XLogRecPtr *stoppos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                                 uint32 *timeline);
@@ -740,16 +751,13 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        char       *copybuf = NULL;
        int64           last_status = -1;
        XLogRecPtr      blockpos = startpos;
-       bool            still_sending = true;
+
+       still_sending = true;
 
        while (1)
        {
                int                     r;
-               int                     xlogoff;
-               int                     bytes_left;
-               int                     bytes_written;
                int64           now;
-               int                     hdr_len;
                long            sleeptime;
 
                /*
@@ -818,198 +826,26 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        goto error;
                if (r == -2)
                {
-                       PGresult   *res = PQgetResult(conn);
-
-                       /*
-                        * The server closed its end of the copy stream.  If we haven't
-                        * closed ours already, we need to do so now, unless the server
-                        * threw an error, in which case we don't.
-                        */
-                       if (still_sending)
-                       {
-                               if (!close_walfile(basedir, partial_suffix, blockpos))
-                               {
-                                       /* Error message written in close_walfile() */
-                                       PQclear(res);
-                                       goto error;
-                               }
-                               if (PQresultStatus(res) == PGRES_COPY_IN)
-                               {
-                                       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-                                       {
-                                               fprintf(stderr,
-                                                               _("%s: could not send copy-end packet: %s"),
-                                                               progname, PQerrorMessage(conn));
-                                               PQclear(res);
-                                               goto error;
-                                       }
-                                       PQclear(res);
-                                       res = PQgetResult(conn);
-                               }
-                               still_sending = false;
-                       }
-                       if (copybuf != NULL)
-                               PQfreemem(copybuf);
-                       copybuf = NULL;
-                       *stoppos = blockpos;
-                       return res;
+                       PGresult        *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+                                                                                                        basedir, partial_suffix, stoppos);
+                       if (res == NULL)
+                               goto error;
+                       else
+                               return res;
                }
 
                /* Check the message type. */
                if (copybuf[0] == 'k')
                {
-                       int                     pos;
-                       bool            replyRequested;
-
-                       /*
-                        * Parse the keepalive message, enclosed in the CopyData message.
-                        * We just check if the server requested a reply, and ignore the
-                        * rest.
-                        */
-                       pos = 1;                        /* skip msgtype 'k' */
-                       pos += 8;                       /* skip walEnd */
-                       pos += 8;                       /* skip sendTime */
-
-                       if (r < pos + 1)
-                       {
-                               fprintf(stderr, _("%s: streaming header too small: %d\n"),
-                                               progname, r);
+                       if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+                                                                        &last_status))
                                goto error;
-                       }
-                       replyRequested = copybuf[pos];
-
-                       /* If the server requested an immediate reply, send one. */
-                       if (replyRequested && still_sending)
-                       {
-                               now = feGetCurrentTimestamp();
-                               if (!sendFeedback(conn, blockpos, now, false))
-                                       goto error;
-                               last_status = now;
-                       }
                }
                else if (copybuf[0] == 'w')
                {
-                       /*
-                        * Once we've decided we don't want to receive any more, just
-                        * ignore any subsequent XLogData messages.
-                        */
-                       if (!still_sending)
-                               continue;
-
-                       /*
-                        * Read the header of the XLogData message, enclosed in the
-                        * CopyData message. We only need the WAL location field
-                        * (dataStart), the rest of the header is ignored.
-                        */
-                       hdr_len = 1;            /* msgtype 'w' */
-                       hdr_len += 8;           /* dataStart */
-                       hdr_len += 8;           /* walEnd */
-                       hdr_len += 8;           /* sendTime */
-                       if (r < hdr_len)
-                       {
-                               fprintf(stderr, _("%s: streaming header too small: %d\n"),
-                                               progname, r);
+                       if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+                                                                       timeline, basedir, stream_stop, partial_suffix))
                                goto error;
-                       }
-                       blockpos = fe_recvint64(&copybuf[1]);
-
-                       /* Extract WAL location for this block */
-                       xlogoff = blockpos % XLOG_SEG_SIZE;
-
-                       /*
-                        * Verify that the initial location in the stream matches where we
-                        * think we are.
-                        */
-                       if (walfile == -1)
-                       {
-                               /* No file open yet */
-                               if (xlogoff != 0)
-                               {
-                                       fprintf(stderr,
-                                                       _("%s: received transaction log record for offset %u with no file open\n"),
-                                                       progname, xlogoff);
-                                       goto error;
-                               }
-                       }
-                       else
-                       {
-                               /* More data in existing segment */
-                               /* XXX: store seek value don't reseek all the time */
-                               if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
-                               {
-                                       fprintf(stderr,
-                                                 _("%s: got WAL data offset %08x, expected %08x\n"),
-                                          progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
-                                       goto error;
-                               }
-                       }
-
-                       bytes_left = r - hdr_len;
-                       bytes_written = 0;
-
-                       while (bytes_left)
-                       {
-                               int                     bytes_to_write;
-
-                               /*
-                                * If crossing a WAL boundary, only write up until we reach
-                                * XLOG_SEG_SIZE.
-                                */
-                               if (xlogoff + bytes_left > XLOG_SEG_SIZE)
-                                       bytes_to_write = XLOG_SEG_SIZE - xlogoff;
-                               else
-                                       bytes_to_write = bytes_left;
-
-                               if (walfile == -1)
-                               {
-                                       if (!open_walfile(blockpos, timeline,
-                                                                         basedir, partial_suffix))
-                                       {
-                                               /* Error logged by open_walfile */
-                                               goto error;
-                                       }
-                               }
-
-                               if (write(walfile,
-                                                 copybuf + hdr_len + bytes_written,
-                                                 bytes_to_write) != bytes_to_write)
-                               {
-                                       fprintf(stderr,
-                                                       _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
-                                                       progname, bytes_to_write, current_walfile_name,
-                                                       strerror(errno));
-                                       goto error;
-                               }
-
-                               /* Write was successful, advance our position */
-                               bytes_written += bytes_to_write;
-                               bytes_left -= bytes_to_write;
-                               blockpos += bytes_to_write;
-                               xlogoff += bytes_to_write;
-
-                               /* Did we reach the end of a WAL segment? */
-                               if (blockpos % XLOG_SEG_SIZE == 0)
-                               {
-                                       if (!close_walfile(basedir, partial_suffix, blockpos))
-                                               /* Error message written in close_walfile() */
-                                               goto error;
-
-                                       xlogoff = 0;
-
-                                       if (still_sending && stream_stop(blockpos, timeline, true))
-                                       {
-                                               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-                                               {
-                                                       fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-                                                                       progname, PQerrorMessage(conn));
-                                                       goto error;
-                                               }
-                                               still_sending = false;
-                                               break;  /* ignore the rest of this XLogData packet */
-                                       }
-                               }
-                       }
-                       /* No more data left to write, receive next copy packet */
                }
                else
                {
@@ -1135,3 +971,225 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
        *buffer = copybuf;
        return rawlen;
 }
+
+/*
+ * Process the keepalive message.
+ */
+static bool
+ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+                                       XLogRecPtr blockpos, int64 *last_status)
+{
+       int                     pos;
+       bool            replyRequested;
+       int64           now;
+
+       /*
+        * Parse the keepalive message, enclosed in the CopyData message.
+        * We just check if the server requested a reply, and ignore the
+        * rest.
+        */
+       pos = 1;                        /* skip msgtype 'k' */
+       pos += 8;                       /* skip walEnd */
+       pos += 8;                       /* skip sendTime */
+
+       if (len < pos + 1)
+       {
+               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                               progname, len);
+               return false;
+       }
+       replyRequested = copybuf[pos];
+
+       /* If the server requested an immediate reply, send one. */
+       if (replyRequested && still_sending)
+       {
+               now = feGetCurrentTimestamp();
+               if (!sendFeedback(conn, blockpos, now, false))
+                       return false;
+               *last_status = now;
+       }
+
+       return true;
+}
+
+/*
+ * Process XLogData message.
+ */
+static bool
+ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+                                  XLogRecPtr *blockpos, uint32 timeline,
+                                  char *basedir, stream_stop_callback stream_stop,
+                                  char *partial_suffix)
+{
+       int                     xlogoff;
+       int                     bytes_left;
+       int                     bytes_written;
+       int                     hdr_len;
+
+       /*
+        * Once we've decided we don't want to receive any more, just
+        * ignore any subsequent XLogData messages.
+        */
+       if (!(still_sending))
+               return true;
+
+       /*
+        * Read the header of the XLogData message, enclosed in the
+        * CopyData message. We only need the WAL location field
+        * (dataStart), the rest of the header is ignored.
+        */
+       hdr_len = 1;            /* msgtype 'w' */
+       hdr_len += 8;           /* dataStart */
+       hdr_len += 8;           /* walEnd */
+       hdr_len += 8;           /* sendTime */
+       if (len < hdr_len)
+       {
+               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                               progname, len);
+               return false;
+       }
+       *blockpos = fe_recvint64(&copybuf[1]);
+
+       /* Extract WAL location for this block */
+       xlogoff = *blockpos % XLOG_SEG_SIZE;
+
+       /*
+        * Verify that the initial location in the stream matches where we
+        * think we are.
+        */
+       if (walfile == -1)
+       {
+               /* No file open yet */
+               if (xlogoff != 0)
+               {
+                       fprintf(stderr,
+                                       _("%s: received transaction log record for offset %u with no file open\n"),
+                                       progname, xlogoff);
+                       return false;
+               }
+       }
+       else
+       {
+               /* More data in existing segment */
+               /* XXX: store seek value don't reseek all the time */
+               if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+               {
+                       fprintf(stderr,
+                                       _("%s: got WAL data offset %08x, expected %08x\n"),
+                                       progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+                       return false;
+               }
+       }
+
+       bytes_left = len - hdr_len;
+       bytes_written = 0;
+
+       while (bytes_left)
+       {
+               int                     bytes_to_write;
+
+               /*
+                * If crossing a WAL boundary, only write up until we reach
+                * XLOG_SEG_SIZE.
+                */
+               if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+                       bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+               else
+                       bytes_to_write = bytes_left;
+
+               if (walfile == -1)
+               {
+                       if (!open_walfile(*blockpos, timeline,
+                                                         basedir, partial_suffix))
+                       {
+                               /* Error logged by open_walfile */
+                               return false;
+                       }
+               }
+
+               if (write(walfile,
+                                 copybuf + hdr_len + bytes_written,
+                                 bytes_to_write) != bytes_to_write)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+                                       progname, bytes_to_write, current_walfile_name,
+                                       strerror(errno));
+                       return false;
+               }
+
+               /* Write was successful, advance our position */
+               bytes_written += bytes_to_write;
+               bytes_left -= bytes_to_write;
+               *blockpos += bytes_to_write;
+               xlogoff += bytes_to_write;
+
+               /* Did we reach the end of a WAL segment? */
+               if (*blockpos % XLOG_SEG_SIZE == 0)
+               {
+                       if (!close_walfile(basedir, partial_suffix, *blockpos))
+                               /* Error message written in close_walfile() */
+                               return false;
+
+                       xlogoff = 0;
+
+                       if (still_sending && stream_stop(*blockpos, timeline, true))
+                       {
+                               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                               {
+                                       fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                                                       progname, PQerrorMessage(conn));
+                                       return false;
+                               }
+                               still_sending = false;
+                               return true;    /* ignore the rest of this XLogData packet */
+                       }
+               }
+       }
+       /* No more data left to write, receive next copy packet */
+
+       return true;
+}
+
+/*
+ * Handle end of the copy stream.
+ */
+static PGresult *
+HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+                                         XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+                                         XLogRecPtr *stoppos)
+{
+       PGresult   *res = PQgetResult(conn);
+
+       /*
+        * The server closed its end of the copy stream.  If we haven't
+        * closed ours already, we need to do so now, unless the server
+        * threw an error, in which case we don't.
+        */
+       if (still_sending)
+       {
+               if (!close_walfile(basedir, partial_suffix, blockpos))
+               {
+                       /* Error message written in close_walfile() */
+                       PQclear(res);
+                       return NULL;
+               }
+               if (PQresultStatus(res) == PGRES_COPY_IN)
+               {
+                       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                       {
+                               fprintf(stderr,
+                                               _("%s: could not send copy-end packet: %s"),
+                                               progname, PQerrorMessage(conn));
+                               PQclear(res);
+                               return NULL;
+                       }
+                       res = PQgetResult(conn);
+               }
+               still_sending = false;
+       }
+       if (copybuf != NULL)
+               PQfreemem(copybuf);
+       *stoppos = blockpos;
+       return res;
+}