]> granicus.if.org Git - python/commitdiff
bpo-34037: Fix test_asyncio failure and add loop.shutdown_default_executor() (GH...
authorKyle Stanley <aeros167@gmail.com>
Thu, 19 Sep 2019 12:47:22 +0000 (08:47 -0400)
committerAndrew Svetlov <andrew.svetlov@gmail.com>
Thu, 19 Sep 2019 12:47:22 +0000 (15:47 +0300)
Doc/library/asyncio-eventloop.rst
Doc/library/asyncio-task.rst
Lib/asyncio/base_events.py
Lib/asyncio/events.py
Lib/asyncio/runners.py
Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst [new file with mode: 0644]

index 8f7974be66eaa6c6a3eee1a629af83085a69d737..2fd4cf30bd450f85e5549fb994f1883be6188611 100644 (file)
@@ -167,6 +167,18 @@ Running and stopping the loop
 
    .. versionadded:: 3.6
 
+.. coroutinemethod:: loop.shutdown_default_executor()
+
+   Schedule the closure of the default executor and wait for it to join all of
+   the threads in the :class:`ThreadPoolExecutor`. After calling this method, a
+   :exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called
+   while using the default executor.
+
+   Note that there is no need to call this function when
+   :func:`asyncio.run` is used.
+
+   .. versionadded:: 3.9
+
 
 Scheduling callbacks
 ^^^^^^^^^^^^^^^^^^^^
index 57e0e07ad3bb6980fb5878f0bb39e427579c4ca1..d9320422db3b8b58bac40be4af7ec7fa00a35e74 100644 (file)
@@ -213,8 +213,8 @@ Running an asyncio Program
 .. function:: run(coro, \*, debug=False)
 
     This function runs the passed coroutine, taking care of
-    managing the asyncio event loop and *finalizing asynchronous
-    generators*.
+    managing the asyncio event loop, *finalizing asynchronous
+    generators*, and closing the threadpool.
 
     This function cannot be called when another asyncio event loop is
     running in the same thread.
@@ -229,6 +229,8 @@ Running an asyncio Program
        **Important:** this function has been added to asyncio in
        Python 3.7 on a :term:`provisional basis <provisional api>`.
 
+    .. versionchanged:: 3.9
+       Updated to use :meth:`loop.shutdown_default_executor`.
 
 Creating Tasks
 ==============
index 14b80bdda9c039f2215dbcb9ae997d9f4440ba0a..031071281b38f72c2d179ab85e7e95addee4f1e2 100644 (file)
@@ -406,6 +406,8 @@ class BaseEventLoop(events.AbstractEventLoop):
         self._asyncgens = weakref.WeakSet()
         # Set to True when `loop.shutdown_asyncgens` is called.
         self._asyncgens_shutdown_called = False
+        # Set to True when `loop.shutdown_default_executor` is called.
+        self._executor_shutdown_called = False
 
     def __repr__(self):
         return (
@@ -503,6 +505,10 @@ class BaseEventLoop(events.AbstractEventLoop):
         if self._closed:
             raise RuntimeError('Event loop is closed')
 
+    def _check_default_executor(self):
+        if self._executor_shutdown_called:
+            raise RuntimeError('Executor shutdown has been called')
+
     def _asyncgen_finalizer_hook(self, agen):
         self._asyncgens.discard(agen)
         if not self.is_closed():
@@ -543,6 +549,26 @@ class BaseEventLoop(events.AbstractEventLoop):
                     'asyncgen': agen
                 })
 
+    async def shutdown_default_executor(self):
+        """Schedule the shutdown of the default executor."""
+        self._executor_shutdown_called = True
+        if self._default_executor is None:
+            return
+        future = self.create_future()
+        thread = threading.Thread(target=self._do_shutdown, args=(future,))
+        thread.start()
+        try:
+            await future
+        finally:
+            thread.join()
+
+    def _do_shutdown(self, future):
+        try:
+            self._default_executor.shutdown(wait=True)
+            self.call_soon_threadsafe(future.set_result, None)
+        except Exception as ex:
+            self.call_soon_threadsafe(future.set_exception, ex)
+
     def run_forever(self):
         """Run until stop() is called."""
         self._check_closed()
@@ -632,6 +658,7 @@ class BaseEventLoop(events.AbstractEventLoop):
         self._closed = True
         self._ready.clear()
         self._scheduled.clear()
+        self._executor_shutdown_called = True
         executor = self._default_executor
         if executor is not None:
             self._default_executor = None
@@ -768,6 +795,8 @@ class BaseEventLoop(events.AbstractEventLoop):
             self._check_callback(func, 'run_in_executor')
         if executor is None:
             executor = self._default_executor
+            # Only check when the default executor is being used
+            self._check_default_executor()
             if executor is None:
                 executor = concurrent.futures.ThreadPoolExecutor()
                 self._default_executor = executor
index 5fb546429cbee53ed55258aacc73470009b7582f..2f06c4ae795d2ad16b29959f2555a1e34cb07af2 100644 (file)
@@ -249,6 +249,10 @@ class AbstractEventLoop:
         """Shutdown all active asynchronous generators."""
         raise NotImplementedError
 
+    async def shutdown_default_executor(self):
+        """Schedule the shutdown of the default executor."""
+        raise NotImplementedError
+
     # Methods scheduling callbacks.  All these return Handles.
 
     def _timer_handle_cancelled(self, handle):
index 5fbab03dd001ac5ff1890a22b8f486cccae8cb4b..6c87747e770bb82d7b5d8410d0bda3721a894a14 100644 (file)
@@ -45,6 +45,7 @@ def run(main, *, debug=False):
         try:
             _cancel_all_tasks(loop)
             loop.run_until_complete(loop.shutdown_asyncgens())
+            loop.run_until_complete(loop.shutdown_default_executor())
         finally:
             events.set_event_loop(None)
             loop.close()
diff --git a/Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst b/Misc/NEWS.d/next/Library/2019-09-11-21-38-41.bpo-34037.LIAS_3.rst
new file mode 100644 (file)
index 0000000..7534516
--- /dev/null
@@ -0,0 +1,4 @@
+For :mod:`asyncio`, add a new coroutine :meth:`loop.shutdown_default_executor`.\r
+The new coroutine provides an API to schedule an executor shutdown that waits\r
+on the threadpool to finish closing. Also, :func:`asyncio.run` has been updated\r
+to utilize the new coroutine. Patch by Kyle Stanley.\r