]> granicus.if.org Git - postgresql/commitdiff
In walreceiver, don't try to do ereport() in a signal handler.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 12 Jun 2019 21:29:48 +0000 (17:29 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 12 Jun 2019 21:29:48 +0000 (17:29 -0400)
This is quite unsafe, even for the case of ereport(FATAL) where we won't
return control to the interrupted code, and despite this code's use of
a flag to restrict the areas where we'd try to do it.  It's possible
for example that we interrupt malloc or free while that's holding a lock
that's meant to protect against cross-thread interference.  Then, any
attempt to do malloc or free within ereport() will result in a deadlock,
preventing the walreceiver process from exiting in response to SIGTERM.
We hypothesize that this explains some hard-to-reproduce failures seen
in the buildfarm.

Hence, get rid of the immediate-exit code in WalRcvShutdownHandler,
as well as the logic associated with WalRcvImmediateInterruptOK.
Instead, we need to take care that potentially-blocking operations
in the walreceiver's data transmission logic (libpqwalreceiver.c)
will respond reasonably promptly to the process's latch becoming
set and then call ProcessWalRcvInterrupts.  Much of the needed code
for that was already present in libpqwalreceiver.c.  I refactored
things a bit so that all the uses of PQgetResult use latch-aware
waiting, but didn't need to do much more.

These changes should be enough to ensure that libpqwalreceiver.c
will respond promptly to SIGTERM whenever it's waiting to receive
data.  In principle, it could block for a long time while waiting
to send data too, and this patch does nothing to guard against that.
I think that that hazard is mostly theoretical though: such blocking
should occur only if we fill the kernel's data transmission buffers,
and we don't generally send enough data to make that happen without
waiting for input.  If we find out that the hazard isn't just
theoretical, we could fix it by using PQsetnonblocking, but that
would require more ticklish changes than I care to make now.

Back-patch of commit a1a789eb5.  This problem goes all the way back
to the origins of walreceiver; but given the substantial reworking
the module received during the v10 cycle, it seems unsafe to assume
that our testing on HEAD validates this patch for pre-v10 branches.
And we'd need to back-patch some prerequisite patches (at least
597a87ccc and its followups, maybe other things), increasing the risk
of problems.  Given the dearth of field reports matching this problem,
it's not worth much risk.  Hence back-patch to v10 and v11 only.

Patch by me; thanks to Thomas Munro for review.

Discussion: https://postgr.es/m/20190416070119.GK2673@paquier.xyz

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/include/replication/walreceiver.h

index 37b481c00200c5366c9608de9d4ed2f1fb1f77de..86f3d2355e0a3752bcb7284a6cf431d0985ec82e 100644 (file)
@@ -95,6 +95,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 
 /* Prototypes for private functions */
 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
+static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
                if (rc & WL_LATCH_SET)
                {
                        ResetLatch(MyLatch);
-                       CHECK_FOR_INTERRUPTS();
+                       ProcessWalRcvInterrupts();
                }
 
                /* If socket is ready, advance the libpq state machine */
@@ -427,6 +428,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 {
        PGresult   *res;
 
+       /*
+        * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+        * block, but the risk seems small.
+        */
        if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
                PQflush(conn->streamConn))
                ereport(ERROR,
@@ -443,7 +448,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
         * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
         * also possible in case we aborted the copy in mid-stream.
         */
-       res = PQgetResult(conn->streamConn);
+       res = libpqrcv_PQgetResult(conn->streamConn);
        if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
                /*
@@ -457,7 +462,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
                PQclear(res);
 
                /* the result set should be followed by CommandComplete */
-               res = PQgetResult(conn->streamConn);
+               res = libpqrcv_PQgetResult(conn->streamConn);
        }
        else if (PQresultStatus(res) == PGRES_COPY_OUT)
        {
@@ -470,7 +475,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
                                                        pchomp(PQerrorMessage(conn->streamConn)))));
 
                /* CommandComplete should follow */
-               res = PQgetResult(conn->streamConn);
+               res = libpqrcv_PQgetResult(conn->streamConn);
        }
 
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -480,7 +485,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
        PQclear(res);
 
        /* Verify that there are no more results */
-       res = PQgetResult(conn->streamConn);
+       res = libpqrcv_PQgetResult(conn->streamConn);
        if (res != NULL)
                ereport(ERROR,
                                (errmsg("unexpected result after CommandComplete: %s",
@@ -543,12 +548,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * The function is modeled on PQexec() in libpq, but only implements
  * those parts that are in use in the walreceiver api.
  *
- * Queries are always executed on the connection in streamConn.
+ * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
 libpqrcv_PQexec(PGconn *streamConn, const char *query)
 {
-       PGresult   *result = NULL;
        PGresult   *lastResult = NULL;
 
        /*
@@ -559,64 +563,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
         */
 
        /*
-        * Submit a query. Since we don't use non-blocking mode, this also can
-        * block. But its risk is relatively small, so we ignore that for now.
+        * Submit the query.  Since we don't use non-blocking mode, this could
+        * theoretically block.  In practice, since we don't send very long query
+        * strings, the risk seems negligible.
         */
        if (!PQsendQuery(streamConn, query))
                return NULL;
 
        for (;;)
        {
-               /*
-                * Receive data until PQgetResult is ready to get the result without
-                * blocking.
-                */
-               while (PQisBusy(streamConn))
-               {
-                       int                     rc;
-
-                       /*
-                        * We don't need to break down the sleep into smaller increments,
-                        * since we'll get interrupted by signals and can either handle
-                        * interrupts here or elog(FATAL) within SIGTERM signal handler if
-                        * the signal arrives in the middle of establishment of
-                        * replication connection.
-                        */
-                       rc = WaitLatchOrSocket(MyLatch,
-                                                                  WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
-                                                                  WL_LATCH_SET,
-                                                                  PQsocket(streamConn),
-                                                                  0,
-                                                                  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-                       /* Emergency bailout? */
-                       if (rc & WL_POSTMASTER_DEATH)
-                               exit(1);
-
-                       /* Interrupted? */
-                       if (rc & WL_LATCH_SET)
-                       {
-                               ResetLatch(MyLatch);
-                               CHECK_FOR_INTERRUPTS();
-                       }
+               /* Wait for, and collect, the next PGresult. */
+               PGresult   *result;
 
-                       /* Consume whatever data is available from the socket */
-                       if (PQconsumeInput(streamConn) == 0)
-                       {
-                               /* trouble; drop whatever we had and return NULL */
-                               PQclear(lastResult);
-                               return NULL;
-                       }
-               }
+               result = libpqrcv_PQgetResult(streamConn);
+               if (result == NULL)
+                       break;                          /* query is complete, or failure */
 
                /*
                 * Emulate PQexec()'s behavior of returning the last result when there
                 * are many.  We are fine with returning just last error message.
                 */
-               result = PQgetResult(streamConn);
-               if (result == NULL)
-                       break;                          /* query is complete */
-
                PQclear(lastResult);
                lastResult = result;
 
@@ -630,6 +596,55 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
        return lastResult;
 }
 
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static PGresult *
+libpqrcv_PQgetResult(PGconn *streamConn)
+{
+       /*
+        * Collect data until PQgetResult is ready to get the result without
+        * blocking.
+        */
+       while (PQisBusy(streamConn))
+       {
+               int                     rc;
+
+               /*
+                * We don't need to break down the sleep into smaller increments,
+                * since we'll get interrupted by signals and can handle any
+                * interrupts here.
+                */
+               rc = WaitLatchOrSocket(MyLatch,
+                                                          WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+                                                          WL_LATCH_SET,
+                                                          PQsocket(streamConn),
+                                                          0,
+                                                          WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+
+               /* Emergency bailout? */
+               if (rc & WL_POSTMASTER_DEATH)
+                       exit(1);
+
+               /* Interrupted? */
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       ProcessWalRcvInterrupts();
+               }
+
+               /* Consume whatever data is available from the socket */
+               if (PQconsumeInput(streamConn) == 0)
+               {
+                       /* trouble; return NULL */
+                       return NULL;
+               }
+       }
+
+       /* Now we can collect and return the next PGresult */
+       return PQgetResult(streamConn);
+}
+
 /*
  * Disconnect connection to primary, if any.
  */
@@ -691,13 +706,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
        {
                PGresult   *res;
 
-               res = PQgetResult(conn->streamConn);
+               res = libpqrcv_PQgetResult(conn->streamConn);
                if (PQresultStatus(res) == PGRES_COMMAND_OK)
                {
                        PQclear(res);
 
                        /* Verify that there are no more results. */
-                       res = PQgetResult(conn->streamConn);
+                       res = libpqrcv_PQgetResult(conn->streamConn);
                        if (res != NULL)
                        {
                                PQclear(res);
@@ -861,7 +876,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
        {
                char       *cstrs[MaxTupleAttributeNumber];
 
-               CHECK_FOR_INTERRUPTS();
+               ProcessWalRcvInterrupts();
 
                /* Do the allocations in temporary context. */
                oldcontext = MemoryContextSwitchTo(rowcontext);
index bde03978c9664623e341c9542132e24f0ebbab63..e4beaada12fb1f84b650ad312fe032630d24184e 100644 (file)
@@ -111,28 +111,7 @@ static struct
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
-/*
- * About SIGTERM handling:
- *
- * We can't just exit(1) within SIGTERM signal handler, because the signal
- * might arrive in the middle of some critical operation, like while we're
- * holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking operations
- * like libpqrcv_PQexec(), which can take a long time to finish.
- *
- * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
- * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
- * sets got_SIGTERM flag, which is checked in the main loop when convenient.
- *
- * This is very much like what regular backends do with ImmediateInterruptOK,
- * ProcessInterrupts() etc.
- */
-static volatile bool WalRcvImmediateInterruptOK = false;
-
 /* Prototypes for private functions */
-static void ProcessWalRcvInterrupts(void);
-static void EnableWalRcvImmediateExit(void);
-static void DisableWalRcvImmediateExit(void);
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
@@ -150,7 +129,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
 
-static void
+/*
+ * Process any interrupts the walreceiver process may have received.
+ * This should be called any time the process's latch has become set.
+ *
+ * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
+ * SIGTERM signal handler, because the signal might arrive in the middle of
+ * some critical operation, like while we're holding a spinlock.  Instead, the
+ * signal handler sets a flag variable as well as setting the process's latch.
+ * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
+ * latch has become set.  Operations that could block for a long time, such as
+ * reading from a remote server, must pay attention to the latch too; see
+ * libpqrcv_PQgetResult for example.
+ */
+void
 ProcessWalRcvInterrupts(void)
 {
        /*
@@ -162,26 +154,12 @@ ProcessWalRcvInterrupts(void)
 
        if (got_SIGTERM)
        {
-               WalRcvImmediateInterruptOK = false;
                ereport(FATAL,
                                (errcode(ERRCODE_ADMIN_SHUTDOWN),
                                 errmsg("terminating walreceiver process due to administrator command")));
        }
 }
 
-static void
-EnableWalRcvImmediateExit(void)
-{
-       WalRcvImmediateInterruptOK = true;
-       ProcessWalRcvInterrupts();
-}
-
-static void
-DisableWalRcvImmediateExit(void)
-{
-       WalRcvImmediateInterruptOK = false;
-       ProcessWalRcvInterrupts();
-}
 
 /* Main entry point for walreceiver process */
 void
@@ -299,12 +277,10 @@ WalReceiverMain(void)
        PG_SETMASK(&UnBlockSig);
 
        /* Establish the connection to the primary for XLOG streaming */
-       EnableWalRcvImmediateExit();
        wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
        if (!wrconn)
                ereport(ERROR,
                                (errmsg("could not connect to the primary server: %s", err)));
-       DisableWalRcvImmediateExit();
 
        /*
         * Save user-visible connection string.  This clobbers the original
@@ -333,7 +309,6 @@ WalReceiverMain(void)
                 * Check that we're connected to a valid server using the
                 * IDENTIFY_SYSTEM replication command.
                 */
-               EnableWalRcvImmediateExit();
                primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
                                                                                           &server_version);
 
@@ -346,7 +321,6 @@ WalReceiverMain(void)
                                         errdetail("The primary's identifier is %s, the standby's identifier is %s.",
                                                           primary_sysid, standby_sysid)));
                }
-               DisableWalRcvImmediateExit();
 
                /*
                 * Confirm that the current timeline of the primary is the same or
@@ -507,6 +481,8 @@ WalReceiverMain(void)
                                if (rc & WL_LATCH_SET)
                                {
                                        ResetLatch(walrcv->latch);
+                                       ProcessWalRcvInterrupts();
+
                                        if (walrcv->force_reply)
                                        {
                                                /*
@@ -584,9 +560,7 @@ WalReceiverMain(void)
                         * The backend finished streaming. Exit streaming COPY-mode from
                         * our side, too.
                         */
-                       EnableWalRcvImmediateExit();
                        walrcv_endstreaming(wrconn, &primaryTLI);
-                       DisableWalRcvImmediateExit();
 
                        /*
                         * If the server had switched to a new timeline that we didn't
@@ -740,9 +714,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
                                        (errmsg("fetching timeline history file for timeline %u from primary server",
                                                        tli)));
 
-                       EnableWalRcvImmediateExit();
                        walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
-                       DisableWalRcvImmediateExit();
 
                        /*
                         * Check that the filename on the master matches what we
@@ -819,7 +791,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
        errno = save_errno;
 }
 
-/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
+/* SIGTERM: set flag for ProcessWalRcvInterrupts */
 static void
 WalRcvShutdownHandler(SIGNAL_ARGS)
 {
@@ -830,10 +802,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
        if (WalRcv->latch)
                SetLatch(WalRcv->latch);
 
-       /* Don't joggle the elbow of proc_exit */
-       if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
-               ProcessWalRcvInterrupts();
-
        errno = save_errno;
 }
 
index 742ab6be0003dd8d3701a20d9b95a2d530cacc24..bf59c5a1055c908202810a331bfbbe00c4dfc881 100644 (file)
@@ -285,6 +285,7 @@ walrcv_clear_result(WalRcvExecResult *walres)
 
 /* prototypes for functions in walreceiver.c */
 extern void WalReceiverMain(void) pg_attribute_noreturn();
+extern void ProcessWalRcvInterrupts(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
 extern Size WalRcvShmemSize(void);