assert handle is not None
return handle
else:
- event = multiprocessing.Event()
+ event = self.Event[0]()
self.CALL_LOCKS[id(event)] = event
return id(event)
else:
self.CALL_LOCKS[handle].set()
- def __init__(self, manual_finish=False, result=42):
+ def __init__(self, Event, manual_finish=False, result=42):
+ self.Event = Event
self._called_event = self._create_event()
self._can_finish = self._create_event()
raise ZeroDivisionError()
class MapCall(Call):
- def __init__(self, result=42):
- super().__init__(manual_finish=True, result=result)
+ def __init__(self, Event, result=42):
+ super().__init__(Event, manual_finish=True, result=result)
def __call__(self, manual_finish):
if manual_finish:
def _start_some_futures(self):
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
+ call3 = Call(self.Event, manual_finish=True)
try:
self.executor.submit(call1)
call2.close()
call3.close()
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
+class ThreadPoolMixin:
+ # wrap in tuple to prevent creation of instance methods
+ Event = (threading.Event,)
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self):
self.executor.shutdown(wait=True)
+class ProcessPoolMixin:
+ # wrap in tuple to prevent creation of instance methods
+ Event = (multiprocessing.Event,)
+ def setUp(self):
+ try:
+ self.executor = futures.ProcessPoolExecutor(max_workers=5)
+ except NotImplementedError as e:
+ self.skipTest(str(e))
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
def test_threads_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._threads), 3)
for t in threads:
t.join()
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=5)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
def test_processes_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._processes), 5)
pass
call1.set_can()
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
call2.close()
def test_first_completed_one_already_completed(self):
- call1 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
call1.set_can()
call2.set_can()
- call1 = Call(manual_finish=True)
- call2 = ExceptionCall(manual_finish=True)
- call3 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = ExceptionCall(self.Event, manual_finish=True)
+ call3 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
pass
call1.set_can()
- call1 = ExceptionCall(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = ExceptionCall(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
call2.close()
def test_first_exception_one_already_failed(self):
- call1 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
call1.set_can()
call2.set_can()
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
'this test assumes that future4 will be cancelled before it is '
'queued to run - which might not be the case if '
'ProcessPoolExecutor is too aggresive in scheduling futures')
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
- call4 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
+ call3 = Call(self.Event, manual_finish=True)
+ call4 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
pass
call1.set_can()
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
call2.close()
-class ThreadPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+ pass
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class ProcessPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+ pass
class AsCompletedTests(unittest.TestCase):
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
call1.set_can()
call2.set_can()
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
call2.close()
def test_zero_timeout(self):
- call1 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
try:
future1 = self.executor.submit(call1)
completed_futures = set()
finally:
call1.close()
-class ThreadPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+ pass
-class ProcessPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+ pass
class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
def test_map_timeout(self):
results = []
- timeout_call = MapCall()
+ timeout_call = MapCall(self.Event)
try:
try:
for i in self.executor.map(timeout_call,
self.assertEqual([42, 42], results)
-class ThreadPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+ pass
-class ProcessPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+ pass
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):