# without thread support.
test.support.import_module('threading')
-import io
-import logging
-import multiprocessing
-import sys
import threading
import time
import unittest
-if sys.platform.startswith('win'):
- import ctypes
- import ctypes.wintypes
-
from concurrent import futures
from concurrent.futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
- LOGGER, wait)
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
import concurrent.futures.process
+
def create_future(state=PENDING, exception=None, result=None):
f = Future()
f._state = state
f._result = result
return f
+
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
def mul(x, y):
return x * y
-class Call(object):
- """A call that can be submitted to a future.Executor for testing.
-
- The call signals when it is called and waits for an event before finishing.
- """
- CALL_LOCKS = {}
- def _create_event(self):
- if sys.platform.startswith('win'):
- class SECURITY_ATTRIBUTES(ctypes.Structure):
- _fields_ = [("nLength", ctypes.wintypes.DWORD),
- ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
- ("bInheritHandle", ctypes.wintypes.BOOL)]
-
- s = SECURITY_ATTRIBUTES()
- s.nLength = ctypes.sizeof(s)
- s.lpSecurityDescriptor = None
- s.bInheritHandle = True
-
- handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
- True,
- False,
- None)
- assert handle is not None
- return handle
- else:
- event = self.Event[0]()
- self.CALL_LOCKS[id(event)] = event
- return id(event)
-
- def _wait_on_event(self, handle):
- if sys.platform.startswith('win'):
- # WaitForSingleObject returns 0 if handle is signaled.
- r = ctypes.windll.kernel32.WaitForSingleObject(handle, 60 * 1000)
- if r != 0:
- message = (
- 'WaitForSingleObject({}, ...) failed with {}, '
- 'GetLastError() = {}'.format(
- handle, r, ctypes.GetLastError()))
- logging.critical(message)
- assert False, message
- else:
- self.CALL_LOCKS[handle].wait()
-
- def _signal_event(self, handle):
- if sys.platform.startswith('win'):
- r = ctypes.windll.kernel32.SetEvent(handle) # Returns 0 on failure.
- if r == 0:
- message = (
- 'SetEvent({}) failed with {}, GetLastError() = {}'.format(
- handle, r, ctypes.GetLastError()))
- logging.critical(message)
- assert False, message
- else:
- self.CALL_LOCKS[handle].set()
-
- def __init__(self, Event, manual_finish=False, result=42):
- self.Event = Event
- self._called_event = self._create_event()
- self._can_finish = self._create_event()
-
- self._result = result
-
- if not manual_finish:
- self._signal_event(self._can_finish)
- def wait_on_called(self):
- self._wait_on_event(self._called_event)
+def sleep_and_raise(t):
+ time.sleep(t)
+ raise Exception('this is an exception')
- def set_can(self):
- self._signal_event(self._can_finish)
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
+class ExecutorMixin:
+ worker_count = 5
+ def _prime_executor(self):
+ # Make sure that the executor is ready to do work before running the
+ # tests. This should reduce the probability of timeouts in the tests.
+ futures = [self.executor.submit(time.sleep, 0.1)
+ for _ in range(self.worker_count)]
- return self._result
+ for f in futures:
+ f.result()
- def close(self):
- self.set_can()
- if sys.platform.startswith('win'):
- ctypes.windll.kernel32.CloseHandle(self._called_event)
- ctypes.windll.kernel32.CloseHandle(self._can_finish)
- self._called_event = None
- self._can_finish = None
- else:
- del self.CALL_LOCKS[self._called_event]
- del self.CALL_LOCKS[self._can_finish]
-
-class ExceptionCall(Call):
- def __call__(self):
- self._signal_event(self._called_event)
- self._wait_on_event(self._can_finish)
- raise ZeroDivisionError()
-
-class MapCall(Call):
- def __init__(self, Event, result=42):
- super().__init__(Event, manual_finish=True, result=result)
- def __call__(self, manual_finish):
- if manual_finish:
- super().__call__()
- return self._result
-
-class ExecutorShutdownTest(unittest.TestCase):
- def test_run_after_shutdown(self):
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.submit,
- pow, 2, 5)
-
-
- def _start_some_futures(self):
- call1 = Call(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- call3 = Call(self.Event, manual_finish=True)
-
- try:
- self.executor.submit(call1)
- self.executor.submit(call2)
- self.executor.submit(call3)
-
- call1.wait_on_called()
- call2.wait_on_called()
- call3.wait_on_called()
-
- call1.set_can()
- call2.set_can()
- call3.set_can()
- finally:
- call1.close()
- call2.close()
- call3.close()
-
-class ThreadPoolMixin:
- # wrap in tuple to prevent creation of instance methods
- Event = (threading.Event,)
+class ThreadPoolMixin(ExecutorMixin):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=5)
+ self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
-class ProcessPoolMixin:
- # wrap in tuple to prevent creation of instance methods
- Event = (multiprocessing.Event,)
+
+class ProcessPoolMixin(ExecutorMixin):
def setUp(self):
try:
self.executor = futures.ProcessPoolExecutor(max_workers=5)
except NotImplementedError as e:
self.skipTest(str(e))
+ self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
+
+class ExecutorShutdownTest(unittest.TestCase):
+ def test_run_after_shutdown(self):
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.submit,
+ pow, 2, 5)
+
+
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
+
def test_threads_terminate(self):
- self._start_some_futures()
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
for t in threads:
t.join()
+
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
+
def test_processes_terminate(self):
- self._start_some_futures()
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
- executor = e
+ processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in self.executor._processes:
+ for p in processes:
p.join()
def test_del_shutdown(self):
class WaitTests(unittest.TestCase):
def test_first_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
+ future1 = self.executor.submit(mul, 21, 2)
+ future2 = self.executor.submit(time.sleep, 5)
- call1 = Call(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
-
- self.assertEqual(set([future1]), done)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
- finally:
- call1.close()
- call2.close()
-
- def test_first_completed_one_already_completed(self):
- call1 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
- self.assertEqual(set([SUCCESSFUL_FUTURE]), finished)
- self.assertEqual(set([future1]), pending)
- finally:
- call1.close()
+ def test_first_completed_some_already_completed(self):
+ future1 = self.executor.submit(time.sleep, 2)
- def test_first_exception(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(self.Event, manual_finish=True)
- call2 = ExceptionCall(self.Event, manual_finish=True)
- call3 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEqual(set([future1, future2]), finished)
- self.assertEqual(set([future3]), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
- def test_first_exception_some_already_complete(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
- call1 = ExceptionCall(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ def test_first_exception(self):
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(sleep_and_raise, 5)
+ future3 = self.executor.submit(time.sleep, 10)
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
+
+ def test_first_exception_some_already_complete(self):
+ future1 = self.executor.submit(divmod, 21, 0)
+ future2 = self.executor.submit(time.sleep, 5)
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
- finally:
- call1.close()
- call2.close()
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self):
- call1 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
+ future1 = self.executor.submit(time.sleep, 2)
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([EXCEPTION_FUTURE]), finished)
- self.assertEqual(set([future1]), pending)
- finally:
- call1.close()
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
def test_all_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [future1, future2],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEqual(set([future1, future2]), finished)
- self.assertEqual(set(), pending)
- finally:
- call1.close()
- call2.close()
-
- @unittest.skip # XXX skip the test for now as it hangs
- def test_all_completed_some_already_completed(self):
- def wait_test():
- while not future1._waiters:
- pass
-
- future4.cancel()
- call1.set_can()
- call2.set_can()
- call3.set_can()
-
- self.assertLessEqual(
- futures.process.EXTRA_QUEUED_CALLS,
- 1,
- 'this test assumes that future4 will be cancelled before it is '
- 'queued to run - which might not be the case if '
- 'ProcessPoolExecutor is too aggresive in scheduling futures')
- call1 = Call(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- call3 = Call(self.Event, manual_finish=True)
- call4 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
- future3 = self.executor.submit(call3)
- future4 = self.executor.submit(call4)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2, future3, future4]),
- finished)
- self.assertEqual(set(), pending)
- finally:
- call1.close()
- call2.close()
- call3.close()
- call4.close()
+ future1 = self.executor.submit(divmod, 2, 0)
+ future2 = self.executor.submit(mul, 2, 21)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2]), finished)
+ self.assertEqual(set(), pending)
def test_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
-
- call1 = Call(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
-
- t = threading.Thread(target=wait_test)
- t.start()
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=5,
- return_when=futures.ALL_COMPLETED)
+ future1 = self.executor.submit(mul, 6, 7)
+ future2 = self.executor.submit(time.sleep, 10)
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEqual(set([future2]), pending)
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=5,
+ return_when=futures.ALL_COMPLETED)
-
- finally:
- call1.close()
- call2.close()
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([future2]), pending)
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
pass
+
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
pass
+
class AsCompletedTests(unittest.TestCase):
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
- def wait_test():
- while not future1._waiters:
- pass
- call1.set_can()
- call2.set_can()
-
- call1 = Call(self.Event, manual_finish=True)
- call2 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- future2 = self.executor.submit(call2)
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(mul, 7, 6)
+
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEqual(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
- t = threading.Thread(target=wait_test)
- t.start()
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEqual(set(
+ def test_zero_timeout(self):
+ future1 = self.executor.submit(time.sleep, 2)
+ completed_futures = set()
+ try:
+ for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
- finally:
- call1.close()
- call2.close()
+ future1],
+ timeout=0):
+ completed_futures.add(future)
+ except futures.TimeoutError:
+ pass
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ completed_futures)
- def test_zero_timeout(self):
- call1 = Call(self.Event, manual_finish=True)
- try:
- future1 = self.executor.submit(call1)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
-
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
- finally:
- call1.close()
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
pass
+
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
pass
+
class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_map_timeout(self):
results = []
- timeout_call = MapCall(self.Event)
try:
- try:
- for i in self.executor.map(timeout_call,
- [False, False, True],
- timeout=5):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
- finally:
- timeout_call.close()
-
- self.assertEqual([42, 42], results)
+ for i in self.executor.map(time.sleep,
+ [0, 0, 10],
+ timeout=5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+
+ self.assertEqual([None, None], results)
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
pass
+
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
pass
+
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
callback_result = None