From 601953b67958572162d0ab7d3f24c07340ad9dbb Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 5 Oct 2015 16:20:00 -0700 Subject: [PATCH] Docs and one small improvement for issue #25304, by Vincent Michel. --- Doc/library/asyncio-dev.rst | 12 ++++++--- Doc/library/asyncio-task.rst | 39 +++++++++++++++++++++++++++++ Lib/asyncio/tasks.py | 7 +++++- Lib/test/test_asyncio/test_tasks.py | 21 ++++++++++++++++ 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 4812c2f832..1d1f795294 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -96,10 +96,9 @@ the same thread. But when the task uses ``yield from``, the task is suspended and the event loop executes the next task. To schedule a callback from a different thread, the -:meth:`BaseEventLoop.call_soon_threadsafe` method should be used. Example to -schedule a coroutine from a different thread:: +:meth:`BaseEventLoop.call_soon_threadsafe` method should be used. Example:: - loop.call_soon_threadsafe(asyncio.async, coro_func()) + loop.call_soon_threadsafe(callback, *args) Most asyncio objects are not thread safe. You should only worry if you access objects outside the event loop. For example, to cancel a future, don't call @@ -107,6 +106,13 @@ directly its :meth:`Future.cancel` method, but:: loop.call_soon_threadsafe(fut.cancel) +To schedule a coroutine object from a different thread, the +:func:`run_coroutine_threadsafe` function should be used. It returns a +:class:`concurrent.futures.Future` to access the result:: + + future = asyncio.run_coroutine_threadsafe(coro_func(), loop) + result = future.result(timeout) # Wait for the result with a timeout + To handle signals and to execute subprocesses, the event loop must be run in the main thread. diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index e7ff7d2e2d..41bde257e1 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -661,3 +661,42 @@ Task functions .. versionchanged:: 3.4.3 If the wait is cancelled, the future *fut* is now also cancelled. + +.. function:: run_coroutine_threadsafe(coro, loop) + + Submit a :ref:`coroutine object ` to a given event loop. + + Return a :class:`concurrent.futures.Future` to access the result. + + This function is meant to be called from a different thread than the one + where the event loop is running. Usage:: + + # Create a coroutine + coro = asyncio.sleep(1, result=3) + # Submit the coroutine to a given loop + future = asyncio.run_coroutine_threadsafe(coro, loop) + # Wait for the result with an optional timeout argument + assert future.result(timeout) == 3 + + If an exception is raised in the coroutine, the returned future will be + notified. It can also be used to cancel the task in the event loop:: + + try: + result = future.result(timeout) + except asyncio.TimeoutError: + print('The coroutine took too long, cancelling the task...') + future.cancel() + except Exception as exc: + print('The coroutine raised an exception: {!r}'.format(exc)) + else: + print('The coroutine returned: {!r}'.format(result)) + + See the :ref:`concurrency and multithreading ` + section of the documentation. + + .. note:: + + Unlike the functions above, :func:`run_coroutine_threadsafe` requires the + *loop* argument to be passed explicitely. + + .. versionadded:: 3.4.4 diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 5a7bd9dbcb..b887d88934 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -704,7 +704,12 @@ def run_coroutine_threadsafe(coro, loop): future = concurrent.futures.Future() def callback(): - futures._chain_future(ensure_future(coro, loop=loop), future) + try: + futures._chain_future(ensure_future(coro, loop=loop), future) + except Exception as exc: + if future.set_running_or_notify_cancel(): + future.set_exception(exc) + raise loop.call_soon_threadsafe(callback) return future diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 8ec5d9c9fd..9772baed1c 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -2166,6 +2166,27 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(future) + def test_run_coroutine_threadsafe_task_factory_exception(self): + """Test coroutine submission from a tread to an event loop + when the task factory raise an exception.""" + # Clear the time generator + asyncio.ensure_future(self.add(1, 2), loop=self.loop) + # Schedule the target + future = self.loop.run_in_executor(None, self.target) + # Set corrupted task factory + self.loop.set_task_factory(lambda loop, coro: wrong_name) + # Set exception handler + callback = test_utils.MockCallback() + self.loop.set_exception_handler(callback) + # Run event loop + with self.assertRaises(NameError) as exc_context: + self.loop.run_until_complete(future) + # Check exceptions + self.assertIn('wrong_name', exc_context.exception.args[0]) + self.assertEqual(len(callback.call_args_list), 1) + (loop, context), kwargs = callback.call_args + self.assertEqual(context['exception'], exc_context.exception) + if __name__ == '__main__': unittest.main() -- 2.40.0