]> granicus.if.org Git - python/commitdiff
Add tests for the atexit hook in concurrent.futures (part of #11635)
authorAntoine Pitrou <solipsis@pitrou.net>
Thu, 24 Mar 2011 14:47:39 +0000 (15:47 +0100)
committerAntoine Pitrou <solipsis@pitrou.net>
Thu, 24 Mar 2011 14:47:39 +0000 (15:47 +0100)
Lib/test/test_concurrent_futures.py

index f639eec9e5a587aa0e3f1d4fbdbbd4cbd82ab49c..2662af76cff82d8ac48942d1a6bc008c4c9c33da 100644 (file)
@@ -9,6 +9,9 @@ test.support.import_module('multiprocessing.synchronize')
 # without thread support.
 test.support.import_module('threading')
 
+from test.script_helper import assert_python_ok
+
+import sys
 import threading
 import time
 import unittest
@@ -43,9 +46,30 @@ def sleep_and_raise(t):
     time.sleep(t)
     raise Exception('this is an exception')
 
+def sleep_and_print(t, msg):
+    time.sleep(t)
+    print(msg)
+    sys.stdout.flush()
+
 
 class ExecutorMixin:
     worker_count = 5
+
+    def setUp(self):
+        self.t1 = time.time()
+        try:
+            self.executor = self.executor_type(max_workers=self.worker_count)
+        except NotImplementedError as e:
+            self.skipTest(str(e))
+        self._prime_executor()
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+        dt = time.time() - self.t1
+        if test.support.verbose:
+            print("%.2fs" % dt, end=' ')
+        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
+
     def _prime_executor(self):
         # Make sure that the executor is ready to do work before running the
         # tests. This should reduce the probability of timeouts in the tests.
@@ -57,24 +81,11 @@ class ExecutorMixin:
 
 
 class ThreadPoolMixin(ExecutorMixin):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=5)
-        self._prime_executor()
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+    executor_type = futures.ThreadPoolExecutor
 
 
 class ProcessPoolMixin(ExecutorMixin):
-    def setUp(self):
-        try:
-            self.executor = futures.ProcessPoolExecutor(max_workers=5)
-        except NotImplementedError as e:
-            self.skipTest(str(e))
-        self._prime_executor()
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+    executor_type = futures.ProcessPoolExecutor
 
 
 class ExecutorShutdownTest(unittest.TestCase):
@@ -84,6 +95,20 @@ class ExecutorShutdownTest(unittest.TestCase):
                           self.executor.submit,
                           pow, 2, 5)
 
+    def test_interpreter_shutdown(self):
+        # Test the atexit hook for shutdown of worker threads and processes
+        rc, out, err = assert_python_ok('-c', """if 1:
+            from concurrent.futures import {executor_type}
+            from time import sleep
+            from test.test_concurrent_futures import sleep_and_print
+            t = {executor_type}(5)
+            t.submit(sleep_and_print, 1.0, "apple")
+            """.format(executor_type=self.executor_type.__name__))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b"apple")
+
 
 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
     def _prime_executor(self):