]> granicus.if.org Git - python/commitdiff
Fix bpo-30589: improve Process.exitcode with forkserver (#1989)
authorAntoine Pitrou <pitrou@free.fr>
Mon, 12 Jun 2017 13:28:19 +0000 (15:28 +0200)
committerGitHub <noreply@github.com>
Mon, 12 Jun 2017 13:28:19 +0000 (15:28 +0200)
* Fix bpo-30589: improve Process.exitcode with forkserver

When the child is killed, Process.exitcode should return -signum, not 255.

* Add Misc/NEWS

Lib/multiprocessing/forkserver.py
Lib/multiprocessing/popen_fork.py
Lib/multiprocessing/popen_forkserver.py
Lib/test/_test_multiprocessing.py
Misc/NEWS

index 8156dae3b79f8535616caa8a44b1718dae51fa8c..ddbd0c257399baf05e3459fc4a2ada062cba9302 100644 (file)
@@ -6,6 +6,7 @@ import socket
 import struct
 import sys
 import threading
+import warnings
 
 from . import connection
 from . import process
@@ -22,7 +23,7 @@ __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
 #
 
 MAXFDS_TO_SEND = 256
-UNSIGNED_STRUCT = struct.Struct('Q')     # large enough for pid_t
+SIGNED_STRUCT = struct.Struct('q')     # large enough for pid_t
 
 #
 # Forkserver class
@@ -148,21 +149,33 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
 
     util._close_stdin()
 
-    # ignoring SIGCHLD means no need to reap zombie processes;
+    sig_r, sig_w = os.pipe()
+    os.set_blocking(sig_w, False)
+
+    def sigchld_handler(*_unused):
+        try:
+            os.write(sig_w, b'.')
+        except BlockingIOError:
+            pass
+
     # letting SIGINT through avoids KeyboardInterrupt tracebacks
     handlers = {
-        signal.SIGCHLD: signal.SIG_IGN,
+        signal.SIGCHLD: sigchld_handler,
         signal.SIGINT: signal.SIG_DFL,
         }
     old_handlers = {sig: signal.signal(sig, val)
                     for (sig, val) in handlers.items()}
 
+    # map child pids to client fds
+    pid_to_fd = {}
+
     with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
          selectors.DefaultSelector() as selector:
         _forkserver._forkserver_address = listener.getsockname()
 
         selector.register(listener, selectors.EVENT_READ)
         selector.register(alive_r, selectors.EVENT_READ)
+        selector.register(sig_r, selectors.EVENT_READ)
 
         while True:
             try:
@@ -176,62 +189,100 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
                     assert os.read(alive_r, 1) == b''
                     raise SystemExit
 
-                assert listener in rfds
-                with listener.accept()[0] as s:
-                    code = 1
-                    if os.fork() == 0:
+                if sig_r in rfds:
+                    # Got SIGCHLD
+                    os.read(sig_r, 65536)  # exhaust
+                    while True:
+                        # Scan for child processes
                         try:
-                            _serve_one(s, listener, alive_r, old_handlers)
-                        except Exception:
-                            sys.excepthook(*sys.exc_info())
-                            sys.stderr.flush()
-                        finally:
-                            os._exit(code)
+                            pid, sts = os.waitpid(-1, os.WNOHANG)
+                        except ChildProcessError:
+                            break
+                        if pid == 0:
+                            break
+                        child_w = pid_to_fd.pop(pid, None)
+                        if child_w is not None:
+                            if os.WIFSIGNALED(sts):
+                                returncode = -os.WTERMSIG(sts)
+                            else:
+                                assert os.WIFEXITED(sts)
+                                returncode = os.WEXITSTATUS(sts)
+                            # Write the exit code to the pipe
+                            write_signed(child_w, returncode)
+                            os.close(child_w)
+                        else:
+                            # This shouldn't happen really
+                            warnings.warn('forkserver: waitpid returned '
+                                          'unexpected pid %d' % pid)
+
+                if listener in rfds:
+                    # Incoming fork request
+                    with listener.accept()[0] as s:
+                        # Receive fds from client
+                        fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
+                        assert len(fds) <= MAXFDS_TO_SEND
+                        child_r, child_w, *fds = fds
+                        s.close()
+                        pid = os.fork()
+                        if pid == 0:
+                            # Child
+                            code = 1
+                            try:
+                                listener.close()
+                                code = _serve_one(child_r, fds,
+                                                  (alive_r, child_w, sig_r, sig_w),
+                                                  old_handlers)
+                            except Exception:
+                                sys.excepthook(*sys.exc_info())
+                                sys.stderr.flush()
+                            finally:
+                                os._exit(code)
+                        else:
+                            # Send pid to client processes
+                            write_signed(child_w, pid)
+                            pid_to_fd[pid] = child_w
+                            os.close(child_r)
+                            for fd in fds:
+                                os.close(fd)
 
             except OSError as e:
                 if e.errno != errno.ECONNABORTED:
                     raise
 
-def _serve_one(s, listener, alive_r, handlers):
+
+def _serve_one(child_r, fds, unused_fds, handlers):
     # close unnecessary stuff and reset signal handlers
-    listener.close()
-    os.close(alive_r)
     for sig, val in handlers.items():
         signal.signal(sig, val)
+    for fd in unused_fds:
+        os.close(fd)
 
-    # receive fds from parent process
-    fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
-    s.close()
-    assert len(fds) <= MAXFDS_TO_SEND
-    (child_r, child_w, _forkserver._forkserver_alive_fd,
-     stfd, *_forkserver._inherited_fds) = fds
-    semaphore_tracker._semaphore_tracker._fd = stfd
-
-    # send pid to client processes
-    write_unsigned(child_w, os.getpid())
+    (_forkserver._forkserver_alive_fd,
+     semaphore_tracker._semaphore_tracker._fd,
+     *_forkserver._inherited_fds) = fds
 
-    # run process object received over pipe
+    # Run process object received over pipe
     code = spawn._main(child_r)
 
-    # write the exit code to the pipe
-    write_unsigned(child_w, code)
+    return code
+
 
 #
-# Read and write unsigned numbers
+# Read and write signed numbers
 #
 
-def read_unsigned(fd):
+def read_signed(fd):
     data = b''
-    length = UNSIGNED_STRUCT.size
+    length = SIGNED_STRUCT.size
     while len(data) < length:
         s = os.read(fd, length - len(data))
         if not s:
             raise EOFError('unexpected EOF')
         data += s
-    return UNSIGNED_STRUCT.unpack(data)[0]
+    return SIGNED_STRUCT.unpack(data)[0]
 
-def write_unsigned(fd, n):
-    msg = UNSIGNED_STRUCT.pack(n)
+def write_signed(fd, n):
+    msg = SIGNED_STRUCT.pack(n)
     while msg:
         nbytes = os.write(fd, msg)
         if nbytes == 0:
index 683b52d2271ffa4f80b8853143a2d94c8fb4d8eb..ca28bf37de0d828ec2bd1b7faf6c473380471cb3 100644 (file)
@@ -24,15 +24,12 @@ class Popen(object):
 
     def poll(self, flag=os.WNOHANG):
         if self.returncode is None:
-            while True:
-                try:
-                    pid, sts = os.waitpid(self.pid, flag)
-                except OSError as e:
-                    # Child process not yet created. See #1731717
-                    # e.errno == errno.ECHILD == 10
-                    return None
-                else:
-                    break
+            try:
+                pid, sts = os.waitpid(self.pid, flag)
+            except OSError as e:
+                # Child process not yet created. See #1731717
+                # e.errno == errno.ECHILD == 10
+                return None
             if pid == self.pid:
                 if os.WIFSIGNALED(sts):
                     self.returncode = -os.WTERMSIG(sts)
index 222db2d90a31564bb94c1bb9ad1de918d4f3038f..fa8e574a34e834ce1144b737dca2836ee702b60a 100644 (file)
@@ -52,7 +52,7 @@ class Popen(popen_fork.Popen):
         util.Finalize(self, os.close, (self.sentinel,))
         with open(w, 'wb', closefd=True) as f:
             f.write(buf.getbuffer())
-        self.pid = forkserver.read_unsigned(self.sentinel)
+        self.pid = forkserver.read_signed(self.sentinel)
 
     def poll(self, flag=os.WNOHANG):
         if self.returncode is None:
@@ -61,8 +61,10 @@ class Popen(popen_fork.Popen):
             if not wait([self.sentinel], timeout):
                 return None
             try:
-                self.returncode = forkserver.read_unsigned(self.sentinel)
+                self.returncode = forkserver.read_signed(self.sentinel)
             except (OSError, EOFError):
-                # The process ended abnormally perhaps because of a signal
+                # This should not happen usually, but perhaps the forkserver
+                # process itself got killed
                 self.returncode = 255
+
         return self.returncode
index f1f93674935e7ab13d80aba6d8922355c867066c..70ecc54bfec2c531fd38efb6f6285287eafe4a43 100644 (file)
@@ -274,6 +274,10 @@ class _TestProcess(BaseTestCase):
     def _test_terminate(cls):
         time.sleep(100)
 
+    @classmethod
+    def _test_sleep(cls, delay):
+        time.sleep(delay)
+
     def test_terminate(self):
         if self.TYPE == 'threads':
             self.skipTest('test not appropriate for {}'.format(self.TYPE))
@@ -323,8 +327,9 @@ class _TestProcess(BaseTestCase):
 
         p.join()
 
-        # XXX sometimes get p.exitcode == 0 on Windows ...
-        #self.assertEqual(p.exitcode, -signal.SIGTERM)
+        # sometimes get p.exitcode == 0 on Windows ...
+        if os.name != 'nt':
+            self.assertEqual(p.exitcode, -signal.SIGTERM)
 
     def test_cpu_count(self):
         try:
@@ -398,6 +403,36 @@ class _TestProcess(BaseTestCase):
         p.join()
         self.assertTrue(wait_for_handle(sentinel, timeout=1))
 
+    def test_many_processes(self):
+        if self.TYPE == 'threads':
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+        sm = multiprocessing.get_start_method()
+        N = 5 if sm == 'spawn' else 100
+
+        # Try to overwhelm the forkserver loop with events
+        procs = [self.Process(target=self._test_sleep, args=(0.01,))
+                 for i in range(N)]
+        for p in procs:
+            p.start()
+        for p in procs:
+            p.join(timeout=10)
+        for p in procs:
+            self.assertEqual(p.exitcode, 0)
+
+        procs = [self.Process(target=self._test_terminate)
+                 for i in range(N)]
+        for p in procs:
+            p.start()
+        time.sleep(0.001)  # let the children start...
+        for p in procs:
+            p.terminate()
+        for p in procs:
+            p.join(timeout=10)
+        if os.name != 'nt':
+            for p in procs:
+                self.assertEqual(p.exitcode, -signal.SIGTERM)
+
 #
 #
 #
index 88f1631c31576180301c5af3d947d3577e8b2cc5..491cad3d17d4fb2e1b70e975bece7e2bb27e37d9 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -362,6 +362,10 @@ Extension Modules
 Library
 -------
 
+- bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite
+  of the signal number when the process is killed by a signal (instead
+  of 255) when using the "forkserver" method.
+
 - bpo-28994: The traceback no longer displayed for SystemExit raised in
   a callback registered by atexit.