From 7834d20b57a4320308c3f8262fabf898f89e6a71 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 27 Apr 2017 18:27:02 -0400 Subject: [PATCH] Avoid slow shutdown of pg_basebackup. pg_basebackup's child process did not pay any attention to the pipe from its parent while waiting for input from the source server. If no server data was arriving, it would only wake up and check the pipe every standby_message_timeout or so. This creates a problem since the parent process might determine and send the desired stop position only after the server has reached end-of-WAL and stopped sending data. In the src/test/recovery regression tests, the timing is repeatably such that it takes nearly 10 seconds for the child process to realize that it should shut down. It's not clear how often that would happen in real-world cases, but it sure seems like a bug --- and if the user turns off standby_message_timeout or sets it very large, the delay could be a lot worse. To fix, expand the StreamCtl API to allow the pipe input FD to be passed down to the low-level wait routine, and watch both sockets when sleeping. (Note: AFAICS this issue doesn't affect the Windows port, since it doesn't rely on a pipe to transfer the stop position to the child thread.) Discussion: https://postgr.es/m/6456.1493263884@sss.pgh.pa.us --- src/bin/pg_basebackup/pg_basebackup.c | 5 ++ src/bin/pg_basebackup/pg_receivewal.c | 1 + src/bin/pg_basebackup/receivelog.c | 78 +++++++++++++++++---------- src/bin/pg_basebackup/receivelog.h | 3 ++ 4 files changed, 59 insertions(+), 28 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 40ec0e17dc..e2a2ebb30f 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -480,6 +480,11 @@ LogStreamerMain(logstreamer_param *param) stream.timeline = param->timeline; stream.sysidentifier = param->sysidentifier; stream.stream_stop = reached_end_position; +#ifndef WIN32 + stream.stop_socket = bgpipe[0]; +#else + stream.stop_socket = PGINVALID_SOCKET; +#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; stream.do_sync = do_sync; diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 1a9fe81be1..09385c5cbf 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -409,6 +409,7 @@ StreamLog(void) stream.timeline); stream.stream_stop = stop_streaming; + stream.stop_socket = PGINVALID_SOCKET; stream.standby_message_timeout = standby_message_timeout; stream.synchronous = synchronous; stream.do_sync = true; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 8511e57cf7..c41bba28cd 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -39,8 +39,9 @@ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos); -static int CopyStreamPoll(PGconn *conn, long timeout_ms); -static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); +static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket); +static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, + char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, @@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn) * return. As long as it returns false, streaming will continue * indefinitely. * + * If stream_stop() checks for external input, stop_socket should be set to + * the FD it checks. This will allow such input to be detected promptly + * rather than after standby_message_timeout (which might be indefinite). + * Note that signals will interrupt waits for input as well, but that is + * race-y since a signal received while busy won't interrupt the wait. + * * standby_message_timeout controls how often we send a message * back to the master letting it know our progress, in milliseconds. + * Zero means no messages are sent. * This message will only contain the write location, and never * flush or replay. * @@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); - r = CopyStreamReceive(conn, sleeptime, ©buf); + r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf); while (r != 0) { if (r == -1) @@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Process the received data, and any subsequent data we can read * without blocking. */ - r = CopyStreamReceive(conn, 0, ©buf); + r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf); } } @@ -881,20 +889,25 @@ error: } /* - * Wait until we can read CopyData message, or timeout. + * Wait until we can read a CopyData message, + * or timeout, or occurrence of a signal or input on the stop_socket. + * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.) * * Returns 1 if data has become available for reading, 0 if timed out - * or interrupted by signal, and -1 on an error. + * or interrupted by signal or stop_socket input, and -1 on an error. */ static int -CopyStreamPoll(PGconn *conn, long timeout_ms) +CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) { int ret; fd_set input_mask; + int connsocket; + int maxfd; struct timeval timeout; struct timeval *timeoutptr; - if (PQsocket(conn) < 0) + connsocket = PQsocket(conn); + if (connsocket < 0) { fprintf(stderr, _("%s: invalid socket: %s"), progname, PQerrorMessage(conn)); @@ -902,7 +915,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms) } FD_ZERO(&input_mask); - FD_SET(PQsocket(conn), &input_mask); + FD_SET(connsocket, &input_mask); + maxfd = connsocket; + if (stop_socket != PGINVALID_SOCKET) + { + FD_SET(stop_socket, &input_mask); + maxfd = Max(maxfd, stop_socket); + } if (timeout_ms < 0) timeoutptr = NULL; @@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms) timeoutptr = &timeout; } - ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); - if (ret == 0 || (ret < 0 && errno == EINTR)) - return 0; /* Got a timeout or signal */ - else if (ret < 0) + ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr); + + if (ret < 0) { + if (errno == EINTR) + return 0; /* Got a signal, so not an error */ fprintf(stderr, _("%s: select() failed: %s\n"), progname, strerror(errno)); return -1; } + if (ret > 0 && FD_ISSET(connsocket, &input_mask)) + return 1; /* Got input on connection socket */ - return 1; + return 0; /* Got timeout or input on stop_socket */ } /* @@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms) * point to a buffer holding the received message. The buffer is only valid * until the next CopyStreamReceive call. * - * 0 if no data was available within timeout, or wait was interrupted - * by signal. -1 on error. -2 if the server ended the COPY. + * Returns 0 if no data was available within timeout, or if wait was + * interrupted by signal or stop_socket input. + * -1 on error. -2 if the server ended the COPY. */ static int -CopyStreamReceive(PGconn *conn, long timeout, char **buffer) +CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, + char **buffer) { char *copybuf = NULL; int rawlen; @@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer) rawlen = PQgetCopyData(conn, ©buf, 1); if (rawlen == 0) { + int ret; + /* - * No data available. Wait for some to appear, but not longer than the - * specified timeout, so that we can ping the server. + * No data available. Wait for some to appear, but not longer than + * the specified timeout, so that we can ping the server. Also stop + * waiting if input appears on stop_socket. */ - if (timeout != 0) - { - int ret; - - ret = CopyStreamPoll(conn, timeout); - if (ret <= 0) - return ret; - } + ret = CopyStreamPoll(conn, timeout, stop_socket); + if (ret <= 0) + return ret; - /* Else there is actually data on the socket */ + /* Now there is actually data on the socket */ if (PQconsumeInput(conn) == 0) { fprintf(stderr, diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 42e93ac745..9a51d9a9c4 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -42,6 +42,9 @@ typedef struct StreamCtl stream_stop_callback stream_stop; /* Stop streaming when returns true */ + pgsocket stop_socket; /* if valid, watch for input on this socket + * and check stream_stop() when there is any */ + WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ char *replication_slot; /* Replication slot to use, or NULL */ -- 2.40.0