self._processes = processes
self._pool = []
- self._repopulate_pool()
+ try:
+ self._repopulate_pool()
+ except Exception:
+ for p in self._pool:
+ if p.exitcode is None:
+ p.terminate()
+ for p in self._pool:
+ p.join()
+ raise
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
initargs, maxtasksperchild,
wrap_exception)
)
- pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
+ pool.append(w)
util.debug('added worker')
@staticmethod
#
import unittest
+import unittest.mock
import queue as pyqueue
import contextlib
import time
proc.join()
+class TestPoolNotLeakOnFailure(unittest.TestCase):
+
+ def test_release_unused_processes(self):
+ # Issue #19675: During pool creation, if we can't create a process,
+ # don't leak already created ones.
+ will_fail_in = 3
+ forked_processes = []
+
+ class FailingForkProcess:
+ def __init__(self, **kwargs):
+ self.name = 'Fake Process'
+ self.exitcode = None
+ self.state = None
+ forked_processes.append(self)
+
+ def start(self):
+ nonlocal will_fail_in
+ if will_fail_in <= 0:
+ raise OSError("Manually induced OSError")
+ will_fail_in -= 1
+ self.state = 'started'
+
+ def terminate(self):
+ self.state = 'stopping'
+
+ def join(self):
+ if self.state == 'stopping':
+ self.state = 'stopped'
+
+ def is_alive(self):
+ return self.state == 'started' or self.state == 'stopping'
+
+ with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
+ p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
+ Process=FailingForkProcess))
+ p.close()
+ p.join()
+ self.assertFalse(
+ any(process.is_alive() for process in forked_processes))
+
+
+
class MiscTestCase(unittest.TestCase):
def test__all__(self):
# Just make sure names in blacklist are excluded