Fix bpo-30596: Add close() method to multiprocessing.Process (#2010)
authorAntoine Pitrou <pitrou@free.fr>
Sat, 24 Jun 2017 17:22:23 +0000 (19:22 +0200)
committerGitHub <noreply@github.com>
Sat, 24 Jun 2017 17:22:23 +0000 (19:22 +0200)
* Fix bpo-30596: Add close() method to multiprocessing.Process

* Raise ValueError if close() is called before the Process is finished running

* Add docs

* Add NEWS blurb

Doc/library/multiprocessing.rst
Lib/multiprocessing/forkserver.py
Lib/multiprocessing/popen_fork.py
Lib/multiprocessing/popen_forkserver.py
Lib/multiprocessing/popen_spawn_posix.py
Lib/multiprocessing/popen_spawn_win32.py
Lib/multiprocessing/process.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst [new file with mode: 0644]

index 6b4a8cbd299e4c53b833918e3018f7dace17221a..5265639edb975d10735484918093bb52c4d7dc4a 100644 (file)
@@ -598,6 +598,16 @@ The :mod:`multiprocessing` package mostly replicates the API of the
          acquired a lock or semaphore etc. then terminating it is liable to
          cause other processes to deadlock.
 
+   .. method:: close()
+
+      Close the :class:`Process` object, releasing all resources associated
+      with it.  :exc:`ValueError` is raised if the underlying process
+      is still running.  Once :meth:`close` returns successfully, most
+      other methods and attributes of the :class:`Process` object will
+      raise :exc:`ValueError`.
+
+      .. versionadded:: 3.7
+
    Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`,
    :meth:`terminate` and :attr:`exitcode` methods should only be called by
    the process that created the process object.
index 70105158e559de9b380a0395e8cb7a345b73b5ae..b9f9b9dd8b556655dd04042297ea3d56edbe3ca1 100644 (file)
@@ -210,8 +210,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
                             else:
                                 assert os.WIFEXITED(sts)
                                 returncode = os.WEXITSTATUS(sts)
-                            # Write the exit code to the pipe
-                            write_signed(child_w, returncode)
+                            # Send exit code to client process
+                            try:
+                                write_signed(child_w, returncode)
+                            except BrokenPipeError:
+                                # client vanished
+                                pass
                             os.close(child_w)
                         else:
                             # This shouldn't happen really
@@ -241,8 +245,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
                             finally:
                                 os._exit(code)
                         else:
-                            # Send pid to client processes
-                            write_signed(child_w, pid)
+                            # Send pid to client process
+                            try:
+                                write_signed(child_w, pid)
+                            except BrokenPipeError:
+                                # client vanished
+                                pass
                             pid_to_fd[pid] = child_w
                             os.close(child_r)
                             for fd in fds:
index ca28bf37de0d828ec2bd1b7faf6c473380471cb3..5af9d919e4987ed87451af0c5eb21685ef018403 100644 (file)
@@ -17,6 +17,7 @@ class Popen(object):
         sys.stdout.flush()
         sys.stderr.flush()
         self.returncode = None
+        self.finalizer = None
         self._launch(process_obj)
 
     def duplicate_for_child(self, fd):
@@ -70,5 +71,9 @@ class Popen(object):
                 os._exit(code)
         else:
             os.close(child_w)
-            util.Finalize(self, os.close, (parent_r,))
+            self.finalizer = util.Finalize(self, os.close, (parent_r,))
             self.sentinel = parent_r
+
+    def close(self):
+        if self.finalizer is not None:
+            self.finalizer()
index fa8e574a34e834ce1144b737dca2836ee702b60a..a51a2771aed8cc0d9a94292b250a2130c4609459 100644 (file)
@@ -49,7 +49,7 @@ class Popen(popen_fork.Popen):
             set_spawning_popen(None)
 
         self.sentinel, w = forkserver.connect_to_new_process(self._fds)
-        util.Finalize(self, os.close, (self.sentinel,))
+        self.finalizer = util.Finalize(self, os.close, (self.sentinel,))
         with open(w, 'wb', closefd=True) as f:
             f.write(buf.getbuffer())
         self.pid = forkserver.read_signed(self.sentinel)
index 98f8f0ab334d2ff3f04753019a208bb56f03b061..38151060efa2e33f7d2532d53e25038962080a76 100644 (file)
@@ -62,7 +62,7 @@ class Popen(popen_fork.Popen):
                 f.write(fp.getbuffer())
         finally:
             if parent_r is not None:
-                util.Finalize(self, os.close, (parent_r,))
+                self.finalizer = util.Finalize(self, os.close, (parent_r,))
             for fd in (child_r, child_w, parent_w):
                 if fd is not None:
                     os.close(fd)
index 6fd588f542673ef89e3a2eeee694930e36115b80..ecb86e96ba43649e6e3fef500057c7b6c3b9040e 100644 (file)
@@ -56,7 +56,7 @@ class Popen(object):
             self.returncode = None
             self._handle = hp
             self.sentinel = int(hp)
-            util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+            self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
 
             # send information to child
             set_spawning_popen(self)
@@ -96,3 +96,6 @@ class Popen(object):
             except OSError:
                 if self.wait(timeout=1.0) is None:
                     raise
+
+    def close(self):
+        self.finalizer()
index 37365f2e42cb690c8f5be70b236ddd4afc0c196d..70bb50d99911cadae16fdc09453dff50e09cf46f 100644 (file)
@@ -76,6 +76,7 @@ class BaseProcess(object):
         self._config = _current_process._config.copy()
         self._parent_pid = os.getpid()
         self._popen = None
+        self._closed = False
         self._target = target
         self._args = tuple(args)
         self._kwargs = dict(kwargs)
@@ -85,6 +86,10 @@ class BaseProcess(object):
             self.daemon = daemon
         _dangling.add(self)
 
+    def _check_closed(self):
+        if self._closed:
+            raise ValueError("process object is closed")
+
     def run(self):
         '''
         Method to be run in sub-process; can be overridden in sub-class
@@ -96,6 +101,7 @@ class BaseProcess(object):
         '''
         Start child process
         '''
+        self._check_closed()
         assert self._popen is None, 'cannot start a process twice'
         assert self._parent_pid == os.getpid(), \
                'can only start a process object created by current process'
@@ -110,12 +116,14 @@ class BaseProcess(object):
         '''
         Terminate process; sends SIGTERM signal or uses TerminateProcess()
         '''
+        self._check_closed()
         self._popen.terminate()
 
     def join(self, timeout=None):
         '''
         Wait until child process terminates
         '''
+        self._check_closed()
         assert self._parent_pid == os.getpid(), 'can only join a child process'
         assert self._popen is not None, 'can only join a started process'
         res = self._popen.wait(timeout)
@@ -126,6 +134,7 @@ class BaseProcess(object):
         '''
         Return whether process is alive
         '''
+        self._check_closed()
         if self is _current_process:
             return True
         assert self._parent_pid == os.getpid(), 'can only test a child process'
@@ -134,6 +143,23 @@ class BaseProcess(object):
         self._popen.poll()
         return self._popen.returncode is None
 
+    def close(self):
+        '''
+        Close the Process object.
+
+        This method releases resources held by the Process object.  It is
+        an error to call this method if the child process is still running.
+        '''
+        if self._popen is not None:
+            if self._popen.poll() is None:
+                raise ValueError("Cannot close a process while it is still running. "
+                                 "You should first call join() or terminate().")
+            self._popen.close()
+            self._popen = None
+            del self._sentinel
+            _children.discard(self)
+        self._closed = True
+
     @property
     def name(self):
         return self._name
@@ -174,6 +200,7 @@ class BaseProcess(object):
         '''
         Return exit code of process or `None` if it has yet to stop
         '''
+        self._check_closed()
         if self._popen is None:
             return self._popen
         return self._popen.poll()
@@ -183,6 +210,7 @@ class BaseProcess(object):
         '''
         Return identifier (PID) of process or `None` if it has yet to start
         '''
+        self._check_closed()
         if self is _current_process:
             return os.getpid()
         else:
@@ -196,6 +224,7 @@ class BaseProcess(object):
         Return a file descriptor (Unix) or handle (Windows) suitable for
         waiting for process termination.
         '''
+        self._check_closed()
         try:
             return self._sentinel
         except AttributeError:
@@ -204,6 +233,8 @@ class BaseProcess(object):
     def __repr__(self):
         if self is _current_process:
             status = 'started'
+        elif self._closed:
+            status = 'closed'
         elif self._parent_pid != os.getpid():
             status = 'unknown'
         elif self._popen is None:
@@ -295,6 +326,7 @@ class _MainProcess(BaseProcess):
         self._name = 'MainProcess'
         self._parent_pid = None
         self._popen = None
+        self._closed = False
         self._config = {'authkey': AuthenticationString(os.urandom(32)),
                         'semprefix': '/mp'}
         # Note that some versions of FreeBSD only allow named
@@ -307,6 +339,9 @@ class _MainProcess(BaseProcess):
         # Everything in self._config will be inherited by descendant
         # processes.
 
+    def close(self):
+        pass
+
 
 _current_process = _MainProcess()
 _process_counter = itertools.count(1)
index 7148ea494891776f92761adcc19f9d9cce2f8642..d4a461dc5e68e713888432756c1f8e13f58b8916 100644 (file)
@@ -403,6 +403,42 @@ class _TestProcess(BaseTestCase):
         p.join()
         self.assertTrue(wait_for_handle(sentinel, timeout=1))
 
+    @classmethod
+    def _test_close(cls, rc=0, q=None):
+        if q is not None:
+            q.get()
+        sys.exit(rc)
+
+    def test_close(self):
+        if self.TYPE == "threads":
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+        q = self.Queue()
+        p = self.Process(target=self._test_close, kwargs={'q': q})
+        p.daemon = True
+        p.start()
+        self.assertEqual(p.is_alive(), True)
+        # Child is still alive, cannot close
+        with self.assertRaises(ValueError):
+            p.close()
+
+        q.put(None)
+        p.join()
+        self.assertEqual(p.is_alive(), False)
+        self.assertEqual(p.exitcode, 0)
+        p.close()
+        with self.assertRaises(ValueError):
+            p.is_alive()
+        with self.assertRaises(ValueError):
+            p.join()
+        with self.assertRaises(ValueError):
+            p.terminate()
+        p.close()
+
+        wr = weakref.ref(p)
+        del p
+        gc.collect()
+        self.assertIs(wr(), None)
+
     def test_many_processes(self):
         if self.TYPE == 'threads':
             self.skipTest('test not appropriate for {}'.format(self.TYPE))
diff --git a/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst
new file mode 100644 (file)
index 0000000..6b9e9a1
--- /dev/null
@@ -0,0 +1 @@
+Add a ``close()`` method to ``multiprocessing.Process``.