]> granicus.if.org Git - python/commitdiff
Issue #10798: Reject supporting concurrent.futures if the system has
authorMartin v. Löwis <martin@v.loewis.de>
Mon, 3 Jan 2011 00:07:01 +0000 (00:07 +0000)
committerMartin v. Löwis <martin@v.loewis.de>
Mon, 3 Jan 2011 00:07:01 +0000 (00:07 +0000)
too few POSIX semaphores.

Lib/concurrent/futures/process.py
Lib/test/test_concurrent_futures.py
Misc/NEWS

index f461b7777dbbb76a78beb3fb61a8414e0f42d418..79c60c3d105fcdb1c0a51a87b89351319485cfd9 100644 (file)
@@ -244,6 +244,31 @@ def _queue_manangement_worker(executor_reference,
             else:
                 work_item.future.set_result(result_item.result)
 
+_system_limits_checked = False
+_system_limited = None
+def _check_system_limits():
+    global _system_limits_checked, _system_limited
+    if _system_limits_checked:
+        if _system_limited:
+            raise NotImplementedError(_system_limited)
+    _system_limits_checked = True
+    try:
+        import os
+        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
+    except (AttributeError, ValueError):
+        # sysconf not available or setting not available
+        return
+    if nsems_max == -1:
+        # indetermine limit, assume that limit is determined
+        # by available memory only
+        return
+    if nsems_max >= 256:
+        # minimum number of semaphores available
+        # according to POSIX
+        return
+    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+    raise NotImplementedError(_system_limited)
+
 class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None):
         """Initializes a new ProcessPoolExecutor instance.
@@ -253,6 +278,7 @@ class ProcessPoolExecutor(_base.Executor):
                 execute the given calls. If None or not given then as many
                 worker processes will be created as the machine has processors.
         """
+        _check_system_limits()
         _remove_dead_thread_references()
 
         if max_workers is None:
index 6a95a362d444e16cbc34c15edf1c72ca0f3d4900..48f2e7efbdb2aacec993845bfb6c3c7181d531fb 100644 (file)
@@ -69,7 +69,7 @@ class Call(object):
             assert handle is not None
             return handle
         else:
-            event = multiprocessing.Event()
+            event = self.Event[0]()
             self.CALL_LOCKS[id(event)] = event
             return id(event)
 
@@ -99,7 +99,8 @@ class Call(object):
         else:
             self.CALL_LOCKS[handle].set()
 
-    def __init__(self, manual_finish=False, result=42):
+    def __init__(self, Event, manual_finish=False, result=42):
+        self.Event = Event
         self._called_event = self._create_event()
         self._can_finish = self._create_event()
 
@@ -138,8 +139,8 @@ class ExceptionCall(Call):
         raise ZeroDivisionError()
 
 class MapCall(Call):
-    def __init__(self, result=42):
-        super().__init__(manual_finish=True, result=result)
+    def __init__(self, Event, result=42):
+        super().__init__(Event, manual_finish=True, result=result)
 
     def __call__(self, manual_finish):
         if manual_finish:
@@ -155,9 +156,9 @@ class ExecutorShutdownTest(unittest.TestCase):
 
 
     def _start_some_futures(self):
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        call3 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
+        call3 = Call(self.Event, manual_finish=True)
 
         try:
             self.executor.submit(call1)
@@ -176,13 +177,28 @@ class ExecutorShutdownTest(unittest.TestCase):
             call2.close()
             call3.close()
 
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
+class ThreadPoolMixin:
+    # wrap in tuple to prevent creation of instance methods
+    Event = (threading.Event,)
     def setUp(self):
         self.executor = futures.ThreadPoolExecutor(max_workers=5)
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
 
+class ProcessPoolMixin:
+    # wrap in tuple to prevent creation of instance methods
+    Event = (multiprocessing.Event,)
+    def setUp(self):
+        try:
+            self.executor = futures.ProcessPoolExecutor(max_workers=5)
+        except NotImplementedError as e:
+            self.skipTest(str(e))
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
     def test_threads_terminate(self):
         self._start_some_futures()
         self.assertEqual(len(self.executor._threads), 3)
@@ -208,13 +224,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
         for t in threads:
             t.join()
 
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=5)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
-
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
     def test_processes_terminate(self):
         self._start_some_futures()
         self.assertEqual(len(self.executor._processes), 5)
@@ -251,8 +261,8 @@ class WaitTests(unittest.TestCase):
                 pass
             call1.set_can()
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -270,7 +280,7 @@ class WaitTests(unittest.TestCase):
             call2.close()
 
     def test_first_completed_one_already_completed(self):
-        call1 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
 
@@ -290,9 +300,9 @@ class WaitTests(unittest.TestCase):
             call1.set_can()
             call2.set_can()
 
-        call1 = Call(manual_finish=True)
-        call2 = ExceptionCall(manual_finish=True)
-        call3 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = ExceptionCall(self.Event, manual_finish=True)
+        call3 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -317,8 +327,8 @@ class WaitTests(unittest.TestCase):
                 pass
             call1.set_can()
 
-        call1 = ExceptionCall(manual_finish=True)
-        call2 = Call(manual_finish=True)
+        call1 = ExceptionCall(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -343,7 +353,7 @@ class WaitTests(unittest.TestCase):
             call2.close()
 
     def test_first_exception_one_already_failed(self):
-        call1 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
 
@@ -363,8 +373,8 @@ class WaitTests(unittest.TestCase):
             call1.set_can()
             call2.set_can()
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -397,10 +407,10 @@ class WaitTests(unittest.TestCase):
                'this test assumes that future4 will be cancelled before it is '
                'queued to run - which might not be the case if '
                'ProcessPoolExecutor is too aggresive in scheduling futures')
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
-        call3 = Call(manual_finish=True)
-        call4 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
+        call3 = Call(self.Event, manual_finish=True)
+        call4 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -432,8 +442,8 @@ class WaitTests(unittest.TestCase):
                 pass
             call1.set_can()
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -460,19 +470,11 @@ class WaitTests(unittest.TestCase):
             call2.close()
 
 
-class ThreadPoolWaitTests(WaitTests):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+    pass
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
-
-class ProcessPoolWaitTests(WaitTests):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+    pass
 
 class AsCompletedTests(unittest.TestCase):
     # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
@@ -483,8 +485,8 @@ class AsCompletedTests(unittest.TestCase):
             call1.set_can()
             call2.set_can()
 
-        call1 = Call(manual_finish=True)
-        call2 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
+        call2 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             future2 = self.executor.submit(call2)
@@ -507,7 +509,7 @@ class AsCompletedTests(unittest.TestCase):
             call2.close()
 
     def test_zero_timeout(self):
-        call1 = Call(manual_finish=True)
+        call1 = Call(self.Event, manual_finish=True)
         try:
             future1 = self.executor.submit(call1)
             completed_futures = set()
@@ -529,19 +531,11 @@ class AsCompletedTests(unittest.TestCase):
         finally:
             call1.close()
 
-class ThreadPoolAsCompletedTests(AsCompletedTests):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+    pass
 
-class ProcessPoolAsCompletedTests(AsCompletedTests):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+    pass
 
 class ExecutorTest(unittest.TestCase):
     # Executor.shutdown() and context manager usage is tested by
@@ -567,7 +561,7 @@ class ExecutorTest(unittest.TestCase):
 
     def test_map_timeout(self):
         results = []
-        timeout_call = MapCall()
+        timeout_call = MapCall(self.Event)
         try:
             try:
                 for i in self.executor.map(timeout_call,
@@ -583,19 +577,11 @@ class ExecutorTest(unittest.TestCase):
 
         self.assertEqual([42, 42], results)
 
-class ThreadPoolExecutorTest(ExecutorTest):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+    pass
 
-class ProcessPoolExecutorTest(ExecutorTest):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+    pass
 
 class FutureTests(unittest.TestCase):
     def test_done_callback_with_result(self):
index 2029f1b5dc92c7f854b40ed1a628239fc7c07c62..75083cf314c5f1dc12b7aa4162c3c4934358ad21 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -20,6 +20,9 @@ Core and Builtins
 Library
 -------
 
+- Issue #10798: Reject supporting concurrent.futures if the system has too
+  few POSIX semaphores.
+
 - Issue #10807: Remove base64, bz2, hex, quopri, rot13, uu and zlib codecs from
   the codec aliases. They are still accessible via codecs.lookup().