import contextlib
import itertools
+import logging
+from logging.handlers import QueueHandler
import os
+import queue
import sys
import threading
import time
def get_init_status():
return INITIALIZER_STATUS
-def init_fail():
+def init_fail(log_queue=None):
+ if log_queue is not None:
+ logger = logging.getLogger('concurrent.futures')
+ logger.addHandler(QueueHandler(log_queue))
+ logger.setLevel('CRITICAL')
+ logger.propagate = False
time.sleep(0.1) # let some futures be scheduled
raise ValueError('error in initializer')
super().setUp()
self.t1 = time.time()
- try:
- if hasattr(self, "ctx"):
- self.executor = self.executor_type(
- max_workers=self.worker_count,
- mp_context=get_context(self.ctx),
- **self.executor_kwargs)
- else:
- self.executor = self.executor_type(
- max_workers=self.worker_count,
- **self.executor_kwargs)
- except NotImplementedError as e:
- self.skipTest(str(e))
+ if hasattr(self, "ctx"):
+ self.executor = self.executor_type(
+ max_workers=self.worker_count,
+ mp_context=self.get_context(),
+ **self.executor_kwargs)
+ else:
+ self.executor = self.executor_type(
+ max_workers=self.worker_count,
+ **self.executor_kwargs)
self._prime_executor()
def tearDown(self):
super().tearDown()
+ def get_context(self):
+ return get_context(self.ctx)
+
def _prime_executor(self):
# Make sure that the executor is ready to do work before running the
# tests. This should reduce the probability of timeouts in the tests.
executor_type = futures.ProcessPoolExecutor
ctx = "fork"
- def setUp(self):
+ def get_context(self):
if sys.platform == "win32":
self.skipTest("require unix system")
- super().setUp()
+ return super().get_context()
class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "forkserver"
- def setUp(self):
+ def get_context(self):
if sys.platform == "win32":
self.skipTest("require unix system")
- super().setUp()
+ return super().get_context()
def create_executor_tests(mixin, bases=(BaseTestCase,),
worker_count = 2
def setUp(self):
- self.executor_kwargs = dict(initializer=init_fail)
+ if hasattr(self, "ctx"):
+ # Pass a queue to redirect the child's logging output
+ self.mp_context = self.get_context()
+ self.log_queue = self.mp_context.Queue()
+ self.executor_kwargs = dict(initializer=init_fail,
+ initargs=(self.log_queue,))
+ else:
+ # In a thread pool, the child shares our logging setup
+ # (see _assert_logged())
+ self.mp_context = None
+ self.log_queue = None
+ self.executor_kwargs = dict(initializer=init_fail)
super().setUp()
def test_initializer(self):
@contextlib.contextmanager
def _assert_logged(self, msg):
- if self.executor_type is futures.ProcessPoolExecutor:
- # No easy way to catch the child processes' stderr
+ if self.log_queue is not None:
yield
+ output = []
+ try:
+ while True:
+ output.append(self.log_queue.get_nowait().getMessage())
+ except queue.Empty:
+ pass
else:
with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
yield
- self.assertTrue(any(msg in line for line in cm.output),
- cm.output)
+ output = cm.output
+ self.assertTrue(any(msg in line for line in output),
+ output)
create_executor_tests(InitializerMixin)