]> granicus.if.org Git - postgresql/commitdiff
Refactor pg_receivexlog main loop code, for readability.
authorFujii Masao <fujii@postgresql.org>
Fri, 4 Jul 2014 03:00:48 +0000 (12:00 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 4 Jul 2014 03:00:48 +0000 (12:00 +0900)
Previously the source codes for receiving the data and for
polling the socket were included in pg_receivexlog main loop.
This commit splits out them as separate functions. This is
useful for improving the readability of main loop code and
making the future pg_receivexlog-related patch simpler.

src/bin/pg_basebackup/receivelog.c

index d76e605e21e9746a0fcf50d74e7964e66544290e..4aa35da2fce359d4e735a00ed37cb744c69ff781 100644 (file)
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
                           stream_stop_callback stream_stop, int standby_message_timeout,
                                 char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                                 uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                int                     bytes_written;
                int64           now;
                int                     hdr_len;
-
-               if (copybuf != NULL)
-               {
-                       PQfreemem(copybuf);
-                       copybuf = NULL;
-               }
+               long            sleeptime;
 
                /*
                 * Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        last_status = now;
                }
 
-               r = PQgetCopyData(conn, &copybuf, 1);
-               if (r == 0)
+               /*
+                * Compute how long send/receive loops should sleep
+                */
+               if (standby_message_timeout && still_sending)
                {
-                       /*
-                        * No data available. Wait for some to appear, but not longer than
-                        * the specified timeout, so that we can ping the server.
-                        */
-                       fd_set          input_mask;
-                       struct timeval timeout;
-                       struct timeval *timeoutptr;
-
-                       FD_ZERO(&input_mask);
-                       FD_SET(PQsocket(conn), &input_mask);
-                       if (standby_message_timeout && still_sending)
+                       int64           targettime;
+                       long            secs;
+                       int                     usecs;
+
+                       targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+                       feTimestampDifference(now,
+                                                                 targettime,
+                                                                 &secs,
+                                                                 &usecs);
+                       /* Always sleep at least 1 sec */
+                       if (secs <= 0)
                        {
-                               int64           targettime;
-                               long            secs;
-                               int                     usecs;
-
-                               targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-                               feTimestampDifference(now,
-                                                                         targettime,
-                                                                         &secs,
-                                                                         &usecs);
-                               if (secs <= 0)
-                                       timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-                               else
-                                       timeout.tv_sec = secs;
-                               timeout.tv_usec = usecs;
-                               timeoutptr = &timeout;
+                               secs = 1;
+                               usecs = 0;
                        }
-                       else
-                               timeoutptr = NULL;
 
-                       r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-                       if (r == 0 || (r < 0 && errno == EINTR))
-                       {
-                               /*
-                                * Got a timeout or signal. Continue the loop and either
-                                * deliver a status packet to the server or just go back into
-                                * blocking.
-                                */
-                               continue;
-                       }
-                       else if (r < 0)
-                       {
-                               fprintf(stderr, _("%s: select() failed: %s\n"),
-                                               progname, strerror(errno));
-                               goto error;
-                       }
-                       /* Else there is actually data on the socket */
-                       if (PQconsumeInput(conn) == 0)
-                       {
-                               fprintf(stderr,
-                                               _("%s: could not receive data from WAL stream: %s"),
-                                               progname, PQerrorMessage(conn));
-                               goto error;
-                       }
-                       continue;
+                       sleeptime = secs * 1000 + usecs / 1000;
                }
+               else
+                       sleeptime = -1;
+
+               r = CopyStreamReceive(conn, sleeptime, &copybuf);
+               if (r == 0)
+                       continue;
                if (r == -1)
+                       goto error;
+               if (r == -2)
                {
                        PGresult   *res = PQgetResult(conn);
 
@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        }
                        if (copybuf != NULL)
                                PQfreemem(copybuf);
+                       copybuf = NULL;
                        *stoppos = blockpos;
                        return res;
                }
-               if (r == -2)
-               {
-                       fprintf(stderr, _("%s: could not read COPY data: %s"),
-                                       progname, PQerrorMessage(conn));
-                       goto error;
-               }
 
                /* Check the message type. */
                if (copybuf[0] == 'k')
@@ -1056,3 +1019,115 @@ error:
                PQfreemem(copybuf);
        return NULL;
 }
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+       int                     ret;
+       fd_set          input_mask;
+       struct timeval timeout;
+       struct timeval *timeoutptr;
+
+       if (PQsocket(conn) < 0)
+       {
+               fprintf(stderr, _("%s: socket not open"), progname);
+               return -1;
+       }
+
+       FD_ZERO(&input_mask);
+       FD_SET(PQsocket(conn), &input_mask);
+
+       if (timeout_ms < 0)
+               timeoutptr = NULL;
+       else
+       {
+               timeout.tv_sec = timeout_ms / 1000L;
+               timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+               timeoutptr = &timeout;
+       }
+
+       ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+       if (ret == 0 || (ret < 0 && errno == EINTR))
+               return 0;               /* Got a timeout or signal */
+       else if (ret < 0)
+       {
+               fprintf(stderr, _("%s: select() failed: %s\n"),
+                               progname, strerror(errno));
+               return -1;
+       }
+
+       return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+       static char        *copybuf = NULL;
+       int                     rawlen;
+
+       if (copybuf != NULL)
+               PQfreemem(copybuf);
+       copybuf = NULL;
+       *buffer = NULL;
+
+       /* Try to receive a CopyData message */
+       rawlen = PQgetCopyData(conn, &copybuf, 1);
+       if (rawlen == 0)
+       {
+               /*
+                * No data available. Wait for some to appear, but not longer than
+                * the specified timeout, so that we can ping the server.
+                */
+               if (timeout > 0)
+               {
+                       int             ret;
+
+                       ret = CopyStreamPoll(conn, timeout);
+                       if (ret <= 0)
+                               return ret;
+               }
+
+               /* Else there is actually data on the socket */
+               if (PQconsumeInput(conn) == 0)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not receive data from WAL stream: %s"),
+                                       progname, PQerrorMessage(conn));
+                       return -1;
+               }
+
+               /* Now that we've consumed some input, try again */
+               rawlen = PQgetCopyData(conn, &copybuf, 1);
+               if (rawlen == 0)
+                       return 0;
+       }
+       if (rawlen == -1)                       /* end-of-streaming or error */
+               return -2;
+       if (rawlen == -2)
+       {
+               fprintf(stderr, _("%s: could not read COPY data: %s"),
+                               progname, PQerrorMessage(conn));
+               return -1;
+       }
+
+       /* Return received messages to caller */
+       *buffer = copybuf;
+       return rawlen;
+}