From a1a789eb5ac894b4ca4b7742f2dc2d9602116e46 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 29 Apr 2019 12:26:07 -0400 Subject: [PATCH] In walreceiver, don't try to do ereport() in a signal handler. This is quite unsafe, even for the case of ereport(FATAL) where we won't return control to the interrupted code, and despite this code's use of a flag to restrict the areas where we'd try to do it. It's possible for example that we interrupt malloc or free while that's holding a lock that's meant to protect against cross-thread interference. Then, any attempt to do malloc or free within ereport() will result in a deadlock, preventing the walreceiver process from exiting in response to SIGTERM. We hypothesize that this explains some hard-to-reproduce failures seen in the buildfarm. Hence, get rid of the immediate-exit code in WalRcvShutdownHandler, as well as the logic associated with WalRcvImmediateInterruptOK. Instead, we need to take care that potentially-blocking operations in the walreceiver's data transmission logic (libpqwalreceiver.c) will respond reasonably promptly to the process's latch becoming set and then call ProcessWalRcvInterrupts. Much of the needed code for that was already present in libpqwalreceiver.c. I refactored things a bit so that all the uses of PQgetResult use latch-aware waiting, but didn't need to do much more. These changes should be enough to ensure that libpqwalreceiver.c will respond promptly to SIGTERM whenever it's waiting to receive data. In principle, it could block for a long time while waiting to send data too, and this patch does nothing to guard against that. I think that that hazard is mostly theoretical though: such blocking should occur only if we fill the kernel's data transmission buffers, and we don't generally send enough data to make that happen without waiting for input. If we find out that the hazard isn't just theoretical, we could fix it by using PQsetnonblocking, but that would require more ticklish changes than I care to make now. This is a bug fix, but it seems like too big a change to push into the back branches without much more testing than there's time for right now. Perhaps we'll back-patch once we have more confidence in the change. Patch by me; thanks to Thomas Munro for review. Discussion: https://postgr.es/m/20190416070119.GK2673@paquier.xyz --- .../libpqwalreceiver/libpqwalreceiver.c | 119 ++++++++++-------- src/backend/replication/walreceiver.c | 66 +++------- src/include/replication/walreceiver.h | 1 + 3 files changed, 85 insertions(+), 101 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 7123d4169d..765d58d120 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -99,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { /* Prototypes for private functions */ static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); +static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + ProcessWalRcvInterrupts(); } /* If socket is ready, advance the libpq state machine */ @@ -456,6 +457,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) { PGresult *res; + /* + * Send copy-end message. As in libpqrcv_PQexec, this could theoretically + * block, but the risk seems small. + */ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || PQflush(conn->streamConn)) ereport(ERROR, @@ -472,7 +477,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -486,7 +491,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -499,7 +504,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -509,7 +514,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (res != NULL) ereport(ERROR, (errmsg("unexpected result after CommandComplete: %s", @@ -572,12 +577,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * The function is modeled on PQexec() in libpq, but only implements * those parts that are in use in the walreceiver api. * - * Queries are always executed on the connection in streamConn. + * May return NULL, rather than an error result, on failure. */ static PGresult * libpqrcv_PQexec(PGconn *streamConn, const char *query) { - PGresult *result = NULL; PGresult *lastResult = NULL; /* @@ -588,60 +592,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) */ /* - * Submit a query. Since we don't use non-blocking mode, this also can - * block. But its risk is relatively small, so we ignore that for now. + * Submit the query. Since we don't use non-blocking mode, this could + * theoretically block. In practice, since we don't send very long query + * strings, the risk seems negligible. */ if (!PQsendQuery(streamConn, query)) return NULL; for (;;) { - /* - * Receive data until PQgetResult is ready to get the result without - * blocking. - */ - while (PQisBusy(streamConn)) - { - int rc; - - /* - * We don't need to break down the sleep into smaller increments, - * since we'll get interrupted by signals and can either handle - * interrupts here or elog(FATAL) within SIGTERM signal handler if - * the signal arrives in the middle of establishment of - * replication connection. - */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } + /* Wait for, and collect, the next PGresult. */ + PGresult *result; - /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) - { - /* trouble; drop whatever we had and return NULL */ - PQclear(lastResult); - return NULL; - } - } + result = libpqrcv_PQgetResult(streamConn); + if (result == NULL) + break; /* query is complete, or failure */ /* * Emulate PQexec()'s behavior of returning the last result when there * are many. We are fine with returning just last error message. */ - result = PQgetResult(streamConn); - if (result == NULL) - break; /* query is complete */ - PQclear(lastResult); lastResult = result; @@ -655,6 +625,51 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) return lastResult; } +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static PGresult * +libpqrcv_PQgetResult(PGconn *streamConn) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(streamConn)) + { + int rc; + + /* + * We don't need to break down the sleep into smaller increments, + * since we'll get interrupted by signals and can handle any + * interrupts here. + */ + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | + WL_LATCH_SET, + PQsocket(streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessWalRcvInterrupts(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(streamConn) == 0) + { + /* trouble; return NULL */ + return NULL; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(streamConn); +} + /* * Disconnect connection to primary, if any. */ @@ -716,13 +731,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn->streamConn); if (res != NULL) { PQclear(res); @@ -886,7 +901,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, { char *cstrs[MaxTupleAttributeNumber]; - CHECK_FOR_INTERRUPTS(); + ProcessWalRcvInterrupts(); /* Do the allocations in temporary context. */ oldcontext = MemoryContextSwitchTo(rowcontext); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f32cf91ffb..d52ec7b2cf 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -112,28 +112,7 @@ static struct static StringInfoData reply_message; static StringInfoData incoming_message; -/* - * About SIGTERM handling: - * - * We can't just exit(1) within SIGTERM signal handler, because the signal - * might arrive in the middle of some critical operation, like while we're - * holding a spinlock. We also can't just set a flag in signal handler and - * check it in the main loop, because we perform some blocking operations - * like libpqrcv_PQexec(), which can take a long time to finish. - * - * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's - * safe for the signal handler to elog(FATAL) immediately. Otherwise it just - * sets got_SIGTERM flag, which is checked in the main loop when convenient. - * - * This is very much like what regular backends do with ImmediateInterruptOK, - * ProcessInterrupts() etc. - */ -static volatile bool WalRcvImmediateInterruptOK = false; - /* Prototypes for private functions */ -static void ProcessWalRcvInterrupts(void); -static void EnableWalRcvImmediateExit(void); -static void DisableWalRcvImmediateExit(void); static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); @@ -151,7 +130,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); -static void +/* + * Process any interrupts the walreceiver process may have received. + * This should be called any time the process's latch has become set. + * + * Currently, only SIGTERM is of interest. We can't just exit(1) within the + * SIGTERM signal handler, because the signal might arrive in the middle of + * some critical operation, like while we're holding a spinlock. Instead, the + * signal handler sets a flag variable as well as setting the process's latch. + * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the + * latch has become set. Operations that could block for a long time, such as + * reading from a remote server, must pay attention to the latch too; see + * libpqrcv_PQgetResult for example. + */ +void ProcessWalRcvInterrupts(void) { /* @@ -163,26 +155,12 @@ ProcessWalRcvInterrupts(void) if (got_SIGTERM) { - WalRcvImmediateInterruptOK = false; ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating walreceiver process due to administrator command"))); } } -static void -EnableWalRcvImmediateExit(void) -{ - WalRcvImmediateInterruptOK = true; - ProcessWalRcvInterrupts(); -} - -static void -DisableWalRcvImmediateExit(void) -{ - WalRcvImmediateInterruptOK = false; - ProcessWalRcvInterrupts(); -} /* Main entry point for walreceiver process */ void @@ -292,12 +270,10 @@ WalReceiverMain(void) PG_SETMASK(&UnBlockSig); /* Establish the connection to the primary for XLOG streaming */ - EnableWalRcvImmediateExit(); wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) ereport(ERROR, (errmsg("could not connect to the primary server: %s", err))); - DisableWalRcvImmediateExit(); /* * Save user-visible connection string. This clobbers the original @@ -336,7 +312,6 @@ WalReceiverMain(void) * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command. */ - EnableWalRcvImmediateExit(); primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, @@ -348,7 +323,6 @@ WalReceiverMain(void) errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } - DisableWalRcvImmediateExit(); /* * Confirm that the current timeline of the primary is the same or @@ -509,6 +483,8 @@ WalReceiverMain(void) if (rc & WL_LATCH_SET) { ResetLatch(walrcv->latch); + ProcessWalRcvInterrupts(); + if (walrcv->force_reply) { /* @@ -577,9 +553,7 @@ WalReceiverMain(void) * The backend finished streaming. Exit streaming COPY-mode from * our side, too. */ - EnableWalRcvImmediateExit(); walrcv_endstreaming(wrconn, &primaryTLI); - DisableWalRcvImmediateExit(); /* * If the server had switched to a new timeline that we didn't @@ -726,9 +700,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) (errmsg("fetching timeline history file for timeline %u from primary server", tli))); - EnableWalRcvImmediateExit(); walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); - DisableWalRcvImmediateExit(); /* * Check that the filename on the master matches what we @@ -805,7 +777,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) errno = save_errno; } -/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ +/* SIGTERM: set flag for ProcessWalRcvInterrupts */ static void WalRcvShutdownHandler(SIGNAL_ARGS) { @@ -816,10 +788,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS) if (WalRcv->latch) SetLatch(WalRcv->latch); - /* Don't joggle the elbow of proc_exit */ - if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) - ProcessWalRcvInterrupts(); - errno = save_errno; } diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 33e89cae36..7f2927cb46 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -302,6 +302,7 @@ walrcv_clear_result(WalRcvExecResult *walres) /* prototypes for functions in walreceiver.c */ extern void WalReceiverMain(void) pg_attribute_noreturn(); +extern void ProcessWalRcvInterrupts(void); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void); -- 2.40.0