]> granicus.if.org Git - python/commitdiff
bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Tue, 10 Apr 2018 18:35:02 +0000 (11:35 -0700)
committerAntoine Pitrou <pitrou@free.fr>
Tue, 10 Apr 2018 18:35:02 +0000 (20:35 +0200)
Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError.
(cherry picked from commit c4b695f85e141f57d22d8edf7bc2c756da136918)

Co-authored-by: Mark Nemec <mrknmc@me.com>
Lib/concurrent/futures/process.py
Lib/concurrent/futures/thread.py
Lib/test/test_concurrent_futures.py
Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst [new file with mode: 0644]

index 63f22cfca3252c59cec7b9abf3ba84ecb47b4486..ce7d642b098a71f5150e64bd4d16c1f2f804ff47 100644 (file)
@@ -423,6 +423,10 @@ def _queue_management_worker(executor_reference,
         #   - The executor that owns this worker has been shutdown.
         if shutting_down():
             try:
+                # Flag the executor as shutting down as early as possible if it
+                # is not gc-ed yet.
+                if executor is not None:
+                    executor._shutdown_thread = True
                 # Since no new work items can be added, it is safe to shutdown
                 # this thread if there are no pending work items.
                 if not pending_work_items:
@@ -595,6 +599,9 @@ class ProcessPoolExecutor(_base.Executor):
                 raise BrokenProcessPool(self._broken)
             if self._shutdown_thread:
                 raise RuntimeError('cannot schedule new futures after shutdown')
+            if _global_shutdown:
+                raise RuntimeError('cannot schedule new futures after '
+                                   'interpreter shutdown')
 
             f = _base.Future()
             w = _WorkItem(f, fn, args, kwargs)
index 6e22950a157db6c80aba15047aa4846544d152c9..b65dee11f727279df53d4723eb3be40976534c5c 100644 (file)
@@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs):
             #   - The executor that owns the worker has been collected OR
             #   - The executor that owns the worker has been shutdown.
             if _shutdown or executor is None or executor._shutdown:
+                # Flag the executor as shutting down as early as possible if it
+                # is not gc-ed yet.
+                if executor is not None:
+                    executor._shutdown = True
                 # Notice other workers
                 work_queue.put(None)
                 return
@@ -145,6 +149,9 @@ class ThreadPoolExecutor(_base.Executor):
 
             if self._shutdown:
                 raise RuntimeError('cannot schedule new futures after shutdown')
+            if _shutdown:
+                raise RuntimeError('cannot schedule new futures after'
+                                   'interpreter shutdown')
 
             f = _base.Future()
             w = _WorkItem(f, fn, args, kwargs)
index 18d0265f3f61a63ffdb39f30237972362648e0a3..b258a0eafde6d4445244083b1af920aca129956b 100644 (file)
@@ -303,6 +303,34 @@ class ExecutorShutdownTest:
         self.assertFalse(err)
         self.assertEqual(out.strip(), b"apple")
 
+    def test_submit_after_interpreter_shutdown(self):
+        # Test the atexit hook for shutdown of worker threads and processes
+        rc, out, err = assert_python_ok('-c', """if 1:
+            import atexit
+            @atexit.register
+            def run_last():
+                try:
+                    t.submit(id, None)
+                except RuntimeError:
+                    print("runtime-error")
+                    raise
+            from concurrent.futures import {executor_type}
+            if __name__ == "__main__":
+                context = '{context}'
+                if not context:
+                    t = {executor_type}(5)
+                else:
+                    from multiprocessing import get_context
+                    context = get_context(context)
+                    t = {executor_type}(5, mp_context=context)
+                    t.submit(id, 42).result()
+            """.format(executor_type=self.executor_type.__name__,
+                       context=getattr(self, "ctx", "")))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
+        self.assertEqual(out.strip(), b"runtime-error")
+
     def test_hang_issue12364(self):
         fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
         self.executor.shutdown()
diff --git a/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst
new file mode 100644 (file)
index 0000000..d9411eb
--- /dev/null
@@ -0,0 +1,2 @@
+Raise RuntimeError when ``executor.submit`` is called during interpreter
+shutdown.