]> granicus.if.org Git - python/commitdiff
bpo-36668: FIX reuse semaphore tracker for child processes (#5172)
authorThomas Moreau <thomas.moreau.2010@gmail.com>
Wed, 24 Apr 2019 19:45:52 +0000 (21:45 +0200)
committerAntoine Pitrou <antoine@python.org>
Wed, 24 Apr 2019 19:45:52 +0000 (21:45 +0200)
Fix the multiprocessing.semaphore_tracker so it is reused by child processes.

Lib/multiprocessing/semaphore_tracker.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst [new file with mode: 0644]

index 82833bcf861a49acb17ef54ac51bdd19e8285446..3c2c3ad61aeeec3c4669f0e22c1e8c0be0d61c1f 100644 (file)
@@ -44,20 +44,23 @@ class SemaphoreTracker(object):
         This can be run from any process.  Usually a child process will use
         the semaphore created by its parent.'''
         with self._lock:
-            if self._pid is not None:
+            if self._fd is not None:
                 # semaphore tracker was launched before, is it still running?
+                if self._check_alive():
+                    # => still alive
+                    return
+                # => dead, launch it again
+                os.close(self._fd)
+
+                # Clean-up to avoid dangling processes.
                 try:
-                    pid, _ = os.waitpid(self._pid, os.WNOHANG)
+                    # _pid can be None if this process is a child from another
+                    # python process, which has started the semaphore_tracker.
+                    if self._pid is not None:
+                        os.waitpid(self._pid, 0)
                 except ChildProcessError:
-                    # The process terminated
+                    # The semaphore_tracker has already been terminated.
                     pass
-                else:
-                    if not pid:
-                        # => still alive
-                        return
-
-                # => dead, launch it again
-                os.close(self._fd)
                 self._fd = None
                 self._pid = None
 
@@ -99,6 +102,17 @@ class SemaphoreTracker(object):
             finally:
                 os.close(r)
 
+    def _check_alive(self):
+        '''Check that the pipe has not been closed by sending a probe.'''
+        try:
+            # We cannot use send here as it calls ensure_running, creating
+            # a cycle.
+            os.write(self._fd, b'PROBE:0\n')
+        except OSError:
+            return False
+        else:
+            return True
+
     def register(self, name):
         '''Register name of semaphore with semaphore tracker.'''
         self._send('REGISTER', name)
@@ -150,6 +164,8 @@ def main(fd):
                         cache.add(name)
                     elif cmd == b'UNREGISTER':
                         cache.remove(name)
+                    elif cmd == b'PROBE':
+                        pass
                     else:
                         raise RuntimeError('unrecognized command %r' % cmd)
                 except Exception:
index 553ab8178316a9ba1c477c4300b5cd6894b67094..836fde88cd266db83db4c0b3594056b6ecfef2e9 100644 (file)
@@ -4891,6 +4891,34 @@ class TestSemaphoreTracker(unittest.TestCase):
         # Uncatchable signal.
         self.check_semaphore_tracker_death(signal.SIGKILL, True)
 
+    @staticmethod
+    def _is_semaphore_tracker_reused(conn, pid):
+        from multiprocessing.semaphore_tracker import _semaphore_tracker
+        _semaphore_tracker.ensure_running()
+        # The pid should be None in the child process, expect for the fork
+        # context. It should not be a new value.
+        reused = _semaphore_tracker._pid in (None, pid)
+        reused &= _semaphore_tracker._check_alive()
+        conn.send(reused)
+
+    def test_semaphore_tracker_reused(self):
+        from multiprocessing.semaphore_tracker import _semaphore_tracker
+        _semaphore_tracker.ensure_running()
+        pid = _semaphore_tracker._pid
+
+        r, w = multiprocessing.Pipe(duplex=False)
+        p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
+                                    args=(w, pid))
+        p.start()
+        is_semaphore_tracker_reused = r.recv()
+
+        # Clean up
+        p.join()
+        w.close()
+        r.close()
+
+        self.assertTrue(is_semaphore_tracker_reused)
+
 
 class TestSimpleQueue(unittest.TestCase):
 
diff --git a/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst
new file mode 100644 (file)
index 0000000..32ebf4e
--- /dev/null
@@ -0,0 +1 @@
+Fix the multiprocessing.semaphore_tracker so it is reused by child processes