Block interrupts during HandleParallelMessages().
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 2 Aug 2016 20:39:16 +0000 (16:39 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 2 Aug 2016 20:39:16 +0000 (16:39 -0400)
As noted by Alvaro, there are CHECK_FOR_INTERRUPTS() calls in the shm_mq.c
functions called by HandleParallelMessages().  I believe they're all
unreachable since we always pass nowait = true, but it doesn't seem like
a great idea to assume that no such call will ever be reachable from
HandleParallelMessages().  If that did happen, there would be a risk of a
recursive call to HandleParallelMessages(), which it does not appear to be
designed for --- for example, there's nothing that would prevent
out-of-order processing of received messages.  And certainly such cases
cannot easily be tested.  So let's prevent it by holding off interrupts for
the duration of the function.  Back-patch to 9.5 which contains identical
code.

Discussion: <14869.1470083848@sss.pgh.pa.us>

src/backend/access/transam/parallel.c

index a303fca35ce6f79f951d4e888179d8a7cd76ce77..a47eba647bcba5e5b8ce7841cf376e7921c369b3 100644 (file)
@@ -704,14 +704,21 @@ HandleParallelMessages(void)
 {
        dlist_iter      iter;
 
+       /*
+        * This is invoked from ProcessInterrupts(), and since some of the
+        * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
+        * for recursive calls if more signals are received while this runs.  It's
+        * unclear that recursive entry would be safe, and it doesn't seem useful
+        * even if it is safe, so let's block interrupts until done.
+        */
+       HOLD_INTERRUPTS();
+
        ParallelMessagePending = false;
 
        dlist_foreach(iter, &pcxt_list)
        {
                ParallelContext *pcxt;
                int                     i;
-               Size            nbytes;
-               void       *data;
 
                pcxt = dlist_container(ParallelContext, node, iter.cur);
                if (pcxt->worker == NULL)
@@ -721,13 +728,15 @@ HandleParallelMessages(void)
                {
                        /*
                         * Read as many messages as we can from each worker, but stop when
-                        * either (1) the error queue goes away, which can happen if we
-                        * receive a Terminate message from the worker; or (2) no more
-                        * messages can be read from the worker without blocking.
+                        * either (1) the worker's error queue goes away, which can happen
+                        * if we receive a Terminate message from the worker; or (2) no
+                        * more messages can be read from the worker without blocking.
                         */
                        while (pcxt->worker[i].error_mqh != NULL)
                        {
                                shm_mq_result res;
+                               Size            nbytes;
+                               void       *data;
 
                                res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
                                                                         &data, true);
@@ -749,6 +758,8 @@ HandleParallelMessages(void)
                        }
                }
        }
+
+       RESUME_INTERRUPTS();
 }
 
 /*