]> granicus.if.org Git - python/commitdiff
[3.6] bpo-31310: multiprocessing's semaphore tracker should be launched again if...
authorAntoine Pitrou <pitrou@free.fr>
Fri, 3 Nov 2017 13:58:37 +0000 (14:58 +0100)
committerGitHub <noreply@github.com>
Fri, 3 Nov 2017 13:58:37 +0000 (14:58 +0100)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed

* Avoid mucking with process state in test.
Add a warning if the semaphore process died, as semaphores may then be leaked.

* Add NEWS entry
(cherry picked from commit cbe1756)

Lib/multiprocessing/semaphore_tracker.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst [new file with mode: 0644]

index de7738eeee8ab264ab059c99d749ee62372c3c39..3b50a46ddc5fbf21b3ad9b58a5b30bc1bba98e9e 100644 (file)
@@ -29,6 +29,7 @@ class SemaphoreTracker(object):
     def __init__(self):
         self._lock = threading.Lock()
         self._fd = None
+        self._pid = None
 
     def getfd(self):
         self.ensure_running()
@@ -40,8 +41,20 @@ class SemaphoreTracker(object):
         This can be run from any process.  Usually a child process will use
         the semaphore created by its parent.'''
         with self._lock:
-            if self._fd is not None:
-                return
+            if self._pid is not None:
+                # semaphore tracker was launched before, is it still running?
+                pid, status = os.waitpid(self._pid, os.WNOHANG)
+                if not pid:
+                    # => still alive
+                    return
+                # => dead, launch it again
+                os.close(self._fd)
+                self._fd = None
+                self._pid = None
+
+                warnings.warn('semaphore_tracker: process died unexpectedly, '
+                              'relaunching.  Some semaphores might leak.')
+
             fds_to_pass = []
             try:
                 fds_to_pass.append(sys.stderr.fileno())
@@ -55,12 +68,13 @@ class SemaphoreTracker(object):
                 exe = spawn.get_executable()
                 args = [exe] + util._args_from_interpreter_flags()
                 args += ['-c', cmd % r]
-                util.spawnv_passfds(exe, args, fds_to_pass)
+                pid = util.spawnv_passfds(exe, args, fds_to_pass)
             except:
                 os.close(w)
                 raise
             else:
                 self._fd = w
+                self._pid = pid
             finally:
                 os.close(r)
 
index 056474baa44d3d001e49e026d31ba31d98b11e26..f01c0041d652dbf6c1477dc25d044202b4b8e412 100644 (file)
@@ -4,6 +4,7 @@
 
 import unittest
 import queue as pyqueue
+import contextlib
 import time
 import io
 import itertools
@@ -4125,14 +4126,14 @@ class TestStartMethod(unittest.TestCase):
             self.fail("failed spawning forkserver or grandchild")
 
 
-#
-# Check that killing process does not leak named semaphores
-#
-
 @unittest.skipIf(sys.platform == "win32",
                  "test semantics don't make sense on Windows")
 class TestSemaphoreTracker(unittest.TestCase):
+
     def test_semaphore_tracker(self):
+        #
+        # Check that killing process does not leak named semaphores
+        #
         import subprocess
         cmd = '''if 1:
             import multiprocessing as mp, time, os
@@ -4166,6 +4167,40 @@ class TestSemaphoreTracker(unittest.TestCase):
         self.assertRegex(err, expected)
         self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
 
+    def check_semaphore_tracker_death(self, signum, should_die):
+        # bpo-31310: if the semaphore tracker process has died, it should
+        # be restarted implicitly.
+        from multiprocessing.semaphore_tracker import _semaphore_tracker
+        _semaphore_tracker.ensure_running()
+        pid = _semaphore_tracker._pid
+        os.kill(pid, signum)
+        time.sleep(1.0)  # give it time to die
+
+        ctx = multiprocessing.get_context("spawn")
+        with contextlib.ExitStack() as stack:
+            if should_die:
+                stack.enter_context(self.assertWarnsRegex(
+                    UserWarning,
+                    "semaphore_tracker: process died"))
+            sem = ctx.Semaphore()
+            sem.acquire()
+            sem.release()
+            wr = weakref.ref(sem)
+            # ensure `sem` gets collected, which triggers communication with
+            # the semaphore tracker
+            del sem
+            gc.collect()
+            self.assertIsNone(wr())
+
+    def test_semaphore_tracker_sigint(self):
+        # Catchable signal (ignored by semaphore tracker)
+        self.check_semaphore_tracker_death(signal.SIGINT, False)
+
+    def test_semaphore_tracker_sigkill(self):
+        # Uncatchable signal.
+        self.check_semaphore_tracker_death(signal.SIGKILL, True)
+
+
 class TestSimpleQueue(unittest.TestCase):
 
     @classmethod
diff --git a/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst
new file mode 100644 (file)
index 0000000..4d340f0
--- /dev/null
@@ -0,0 +1 @@
+multiprocessing's semaphore tracker should be launched again if crashed.