]> granicus.if.org Git - postgresql/commitdiff
Avoid slow shutdown of pg_basebackup.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 27 Apr 2017 22:27:02 +0000 (18:27 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 27 Apr 2017 22:27:02 +0000 (18:27 -0400)
pg_basebackup's child process did not pay any attention to the pipe
from its parent while waiting for input from the source server.
If no server data was arriving, it would only wake up and check the
pipe every standby_message_timeout or so.  This creates a problem
since the parent process might determine and send the desired stop
position only after the server has reached end-of-WAL and stopped
sending data.  In the src/test/recovery regression tests, the timing
is repeatably such that it takes nearly 10 seconds for the child
process to realize that it should shut down.  It's not clear how
often that would happen in real-world cases, but it sure seems like
a bug --- and if the user turns off standby_message_timeout or sets
it very large, the delay could be a lot worse.

To fix, expand the StreamCtl API to allow the pipe input FD to be
passed down to the low-level wait routine, and watch both sockets
when sleeping.

(Note: AFAICS this issue doesn't affect the Windows port, since
it doesn't rely on a pipe to transfer the stop position to the
child thread.)

Discussion: https://postgr.es/m/6456.1493263884@sss.pgh.pa.us

src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivewal.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index 40ec0e17dc5b3f445c0ee07285ce27b937049265..e2a2ebb30f9c3399a7f00a7bee114789d130c7e5 100644 (file)
@@ -480,6 +480,11 @@ LogStreamerMain(logstreamer_param *param)
        stream.timeline = param->timeline;
        stream.sysidentifier = param->sysidentifier;
        stream.stream_stop = reached_end_position;
+#ifndef WIN32
+       stream.stop_socket = bgpipe[0];
+#else
+       stream.stop_socket = PGINVALID_SOCKET;
+#endif
        stream.standby_message_timeout = standby_message_timeout;
        stream.synchronous = false;
        stream.do_sync = do_sync;
index 1a9fe81be14b4918f44ed23007da6d957f367d6b..09385c5cbfcf8b3f8a201781fe0af52973bdd40b 100644 (file)
@@ -409,6 +409,7 @@ StreamLog(void)
                                stream.timeline);
 
        stream.stream_stop = stop_streaming;
+       stream.stop_socket = PGINVALID_SOCKET;
        stream.standby_message_timeout = standby_message_timeout;
        stream.synchronous = synchronous;
        stream.do_sync = true;
index 8511e57cf7df2390fc63dbcef286bab40e2d91db..c41bba28cdfb79562233b40228b2ee6d9dd0932b 100644 (file)
@@ -39,8 +39,9 @@ static bool still_sending = true;             /* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
                                 XLogRecPtr *stoppos);
-static int     CopyStreamPoll(PGconn *conn, long timeout_ms);
-static int     CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static int     CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
+static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+                                 char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
                                        int len, XLogRecPtr blockpos, TimestampTz *last_status);
 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
@@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn)
  * return. As long as it returns false, streaming will continue
  * indefinitely.
  *
+ * If stream_stop() checks for external input, stop_socket should be set to
+ * the FD it checks.  This will allow such input to be detected promptly
+ * rather than after standby_message_timeout (which might be indefinite).
+ * Note that signals will interrupt waits for input as well, but that is
+ * race-y since a signal received while busy won't interrupt the wait.
+ *
  * standby_message_timeout controls how often we send a message
  * back to the master letting it know our progress, in milliseconds.
+ * Zero means no messages are sent.
  * This message will only contain the write location, and never
  * flush or replay.
  *
@@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
                sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                                                                 last_status);
 
-               r = CopyStreamReceive(conn, sleeptime, &copybuf);
+               r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
                while (r != 0)
                {
                        if (r == -1)
@@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
                         * Process the received data, and any subsequent data we can read
                         * without blocking.
                         */
-                       r = CopyStreamReceive(conn, 0, &copybuf);
+                       r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
                }
        }
 
@@ -881,20 +889,25 @@ error:
 }
 
 /*
- * Wait until we can read CopyData message, or timeout.
+ * Wait until we can read a CopyData message,
+ * or timeout, or occurrence of a signal or input on the stop_socket.
+ * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
  *
  * Returns 1 if data has become available for reading, 0 if timed out
- * or interrupted by signal, and -1 on an error.
+ * or interrupted by signal or stop_socket input, and -1 on an error.
  */
 static int
-CopyStreamPoll(PGconn *conn, long timeout_ms)
+CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 {
        int                     ret;
        fd_set          input_mask;
+       int                     connsocket;
+       int                     maxfd;
        struct timeval timeout;
        struct timeval *timeoutptr;
 
-       if (PQsocket(conn) < 0)
+       connsocket = PQsocket(conn);
+       if (connsocket < 0)
        {
                fprintf(stderr, _("%s: invalid socket: %s"), progname,
                                PQerrorMessage(conn));
@@ -902,7 +915,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
        }
 
        FD_ZERO(&input_mask);
-       FD_SET(PQsocket(conn), &input_mask);
+       FD_SET(connsocket, &input_mask);
+       maxfd = connsocket;
+       if (stop_socket != PGINVALID_SOCKET)
+       {
+               FD_SET(stop_socket, &input_mask);
+               maxfd = Max(maxfd, stop_socket);
+       }
 
        if (timeout_ms < 0)
                timeoutptr = NULL;
@@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
                timeoutptr = &timeout;
        }
 
-       ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-       if (ret == 0 || (ret < 0 && errno == EINTR))
-               return 0;                               /* Got a timeout or signal */
-       else if (ret < 0)
+       ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
+
+       if (ret < 0)
        {
+               if (errno == EINTR)
+                       return 0;                       /* Got a signal, so not an error */
                fprintf(stderr, _("%s: select() failed: %s\n"),
                                progname, strerror(errno));
                return -1;
        }
+       if (ret > 0 && FD_ISSET(connsocket, &input_mask))
+               return 1;                               /* Got input on connection socket */
 
-       return 1;
+       return 0;                                       /* Got timeout or input on stop_socket */
 }
 
 /*
@@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
  * point to a buffer holding the received message. The buffer is only valid
  * until the next CopyStreamReceive call.
  *
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal. -1 on error. -2 if the server ended the COPY.
+ * Returns 0 if no data was available within timeout, or if wait was
+ * interrupted by signal or stop_socket input.
+ * -1 on error. -2 if the server ended the COPY.
  */
 static int
-CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+                                 char **buffer)
 {
        char       *copybuf = NULL;
        int                     rawlen;
@@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
        rawlen = PQgetCopyData(conn, &copybuf, 1);
        if (rawlen == 0)
        {
+               int                     ret;
+
                /*
-                * No data available. Wait for some to appear, but not longer than the
-                * specified timeout, so that we can ping the server.
+                * No data available.  Wait for some to appear, but not longer than
+                * the specified timeout, so that we can ping the server.  Also stop
+                * waiting if input appears on stop_socket.
                 */
-               if (timeout != 0)
-               {
-                       int                     ret;
-
-                       ret = CopyStreamPoll(conn, timeout);
-                       if (ret <= 0)
-                               return ret;
-               }
+               ret = CopyStreamPoll(conn, timeout, stop_socket);
+               if (ret <= 0)
+                       return ret;
 
-               /* Else there is actually data on the socket */
+               /* Now there is actually data on the socket */
                if (PQconsumeInput(conn) == 0)
                {
                        fprintf(stderr,
index 42e93ac7454a116e7c6ee29c014bb5c7d2dbd746..9a51d9a9c4926d7c5493f2e7c24b9cbfc133b7be 100644 (file)
@@ -42,6 +42,9 @@ typedef struct StreamCtl
 
        stream_stop_callback stream_stop;       /* Stop streaming when returns true */
 
+       pgsocket        stop_socket;    /* if valid, watch for input on this socket
+                                                                * and check stream_stop() when there is any */
+
        WalWriteMethod *walmethod;      /* How to write the WAL */
        char       *partial_suffix; /* Suffix appended to partially received files */
        char       *replication_slot;           /* Replication slot to use, or NULL */