executor_type = futures.ProcessPoolExecutor
-class ExecutorShutdownTest(unittest.TestCase):
+class ExecutorShutdownTest:
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
f.result()
-class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
def _prime_executor(self):
pass
t.join()
-class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
def _prime_executor(self):
pass
p.join()
-class WaitTests(unittest.TestCase):
+class WaitTests:
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
self.assertEqual(set([future2]), pending)
-class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
sys.setswitchinterval(oldswitchinterval)
-class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
pass
-class AsCompletedTests(unittest.TestCase):
+class AsCompletedTests:
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
completed_futures)
-class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
pass
-class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
pass
-class ExecutorTest(unittest.TestCase):
+class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
self.executor.shutdown()
-class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
def test_map_submits_without_iteration(self):
"""Tests verifying issue 11777."""
finished = []
self.assertCountEqual(finished, range(10))
-class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
def test_killed_child(self):
# When a child process is abruptly terminated, the whole pool gets
# "broken".
@test.support.reap_threads
def test_main():
try:
- test.support.run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest,
- )
+ test.support.run_unittest(__name__)
finally:
test.support.reap_children()