]> granicus.if.org Git - python/commitdiff
bpo-19675: Terminate processes if construction of a pool is failing. (GH-5614)
authorJulien Palard <julien@palard.fr>
Sun, 4 Nov 2018 22:40:32 +0000 (23:40 +0100)
committerGitHub <noreply@github.com>
Sun, 4 Nov 2018 22:40:32 +0000 (23:40 +0100)
Lib/multiprocessing/pool.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst [new file with mode: 0644]

index 574b5db5afb6846b9103a1a3bb190fb75686bec3..7a6d014901463e6fd578d30b9b5e24d115cf1783 100644 (file)
@@ -174,7 +174,15 @@ class Pool(object):
 
         self._processes = processes
         self._pool = []
-        self._repopulate_pool()
+        try:
+            self._repopulate_pool()
+        except Exception:
+            for p in self._pool:
+                if p.exitcode is None:
+                    p.terminate()
+            for p in self._pool:
+                p.join()
+            raise
 
         self._worker_handler = threading.Thread(
             target=Pool._handle_workers,
@@ -251,10 +259,10 @@ class Pool(object):
                               initargs, maxtasksperchild,
                               wrap_exception)
                        )
-            pool.append(w)
             w.name = w.name.replace('Process', 'PoolWorker')
             w.daemon = True
             w.start()
+            pool.append(w)
             util.debug('added worker')
 
     @staticmethod
index dc59e9fd740a0fc5f9b42014176d4648e313948b..7993fcb08e465a36ab1f65cfe50a3d421c9fbb6c 100644 (file)
@@ -3,6 +3,7 @@
 #
 
 import unittest
+import unittest.mock
 import queue as pyqueue
 import contextlib
 import time
@@ -4635,6 +4636,48 @@ class TestSimpleQueue(unittest.TestCase):
         proc.join()
 
 
+class TestPoolNotLeakOnFailure(unittest.TestCase):
+
+    def test_release_unused_processes(self):
+        # Issue #19675: During pool creation, if we can't create a process,
+        # don't leak already created ones.
+        will_fail_in = 3
+        forked_processes = []
+
+        class FailingForkProcess:
+            def __init__(self, **kwargs):
+                self.name = 'Fake Process'
+                self.exitcode = None
+                self.state = None
+                forked_processes.append(self)
+
+            def start(self):
+                nonlocal will_fail_in
+                if will_fail_in <= 0:
+                    raise OSError("Manually induced OSError")
+                will_fail_in -= 1
+                self.state = 'started'
+
+            def terminate(self):
+                self.state = 'stopping'
+
+            def join(self):
+                if self.state == 'stopping':
+                    self.state = 'stopped'
+
+            def is_alive(self):
+                return self.state == 'started' or self.state == 'stopping'
+
+        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
+            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
+                Process=FailingForkProcess))
+            p.close()
+            p.join()
+        self.assertFalse(
+            any(process.is_alive() for process in forked_processes))
+
+
+
 class MiscTestCase(unittest.TestCase):
     def test__all__(self):
         # Just make sure names in blacklist are excluded
diff --git a/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst
new file mode 100644 (file)
index 0000000..958550d
--- /dev/null
@@ -0,0 +1 @@
+``multiprocessing.Pool`` no longer leaks processes if its initialization fails.