From e2b340ab4196e1beb902327f503574b5d7369185 Mon Sep 17 00:00:00 2001 From: Elvis Pranskevichus Date: Tue, 29 May 2018 17:31:01 -0400 Subject: [PATCH] bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216) Currently, asyncio.wait_for(fut), upon reaching the timeout deadline, cancels the future and returns immediately. This is problematic for when *fut* is a Task, because it will be left running for an arbitrary amount of time. This behavior is iself surprising and may lead to related bugs such as the one described in bpo-33638: condition = asyncio.Condition() async with condition: await asyncio.wait_for(condition.wait(), timeout=0.5) Currently, instead of raising a TimeoutError, the above code will fail with `RuntimeError: cannot wait on un-acquired lock`, because `__aexit__` is reached _before_ `condition.wait()` finishes its cancellation and re-acquires the condition lock. To resolve this, make `wait_for` await for the task cancellation. The tradeoff here is that the `timeout` promise may be broken if the task decides to handle its cancellation in a slow way. This represents a behavior change and should probably not be back-patched to 3.6 and earlier. --- Doc/library/asyncio-task.rst | 9 ++- Lib/asyncio/tasks.py | 23 +++++++- Lib/test/test_asyncio/test_locks.py | 13 +++++ Lib/test/test_asyncio/test_tasks.py | 56 +++++++++++++++++++ .../2018-05-29-15-32-18.bpo-32751.oBTqr7.rst | 2 + 5 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index dc450c375a..3121b47183 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -790,7 +790,9 @@ Task functions Returns result of the Future or coroutine. When a timeout occurs, it cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task - cancellation, wrap it in :func:`shield`. + cancellation, wrap it in :func:`shield`. The function will wait until + the future is actually cancelled, so the total wait time may exceed + the *timeout*. If the wait is cancelled, the future *fut* is also cancelled. @@ -800,3 +802,8 @@ Task functions .. versionchanged:: 3.4.3 If the wait is cancelled, the future *fut* is now also cancelled. + + .. versionchanged:: 3.7 + When *fut* is cancelled due to a timeout, ``wait_for`` now waits + for *fut* to be cancelled. Previously, + it raised :exc:`~asyncio.TimeoutError` immediately. diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 6cef33d521..72792a25cf 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -412,14 +412,17 @@ async def wait_for(fut, timeout, *, loop=None): return fut.result() else: fut.remove_done_callback(cb) - fut.cancel() + # We must ensure that the task is not running + # after wait_for() returns. + # See https://bugs.python.org/issue32751 + await _cancel_and_wait(fut, loop=loop) raise futures.TimeoutError() finally: timeout_handle.cancel() async def _wait(fs, timeout, return_when, loop): - """Internal helper for wait() and wait_for(). + """Internal helper for wait(). The fs argument must be a collection of Futures. """ @@ -461,6 +464,22 @@ async def _wait(fs, timeout, return_when, loop): return done, pending +async def _cancel_and_wait(fut, loop): + """Cancel the *fut* future or task and wait until it completes.""" + + waiter = loop.create_future() + cb = functools.partial(_release_waiter, waiter) + fut.add_done_callback(cb) + + try: + fut.cancel() + # We cannot wait on *fut* directly to make + # sure _cancel_and_wait itself is reliably cancellable. + await waiter + finally: + fut.remove_done_callback(cb) + + # This is *not* a @coroutine! It is just an iterator (yielding Futures). def as_completed(fs, *, loop=None, timeout=None): """Return an iterator whose values are coroutines. diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py index 8642aa86b9..b8d155e1d0 100644 --- a/Lib/test/test_asyncio/test_locks.py +++ b/Lib/test/test_asyncio/test_locks.py @@ -807,6 +807,19 @@ class ConditionTests(test_utils.TestCase): with self.assertRaises(ValueError): asyncio.Condition(lock, loop=loop) + def test_timeout_in_block(self): + loop = asyncio.new_event_loop() + self.addCleanup(loop.close) + + async def task_timeout(): + condition = asyncio.Condition(loop=loop) + async with condition: + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(condition.wait(), timeout=0.5, + loop=loop) + + loop.run_until_complete(task_timeout()) + class SemaphoreTests(test_utils.TestCase): diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 1280584d31..1282a98c21 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -789,6 +789,62 @@ class BaseTaskTests: res = loop.run_until_complete(task) self.assertEqual(res, "ok") + def test_wait_for_waits_for_task_cancellation(self): + loop = asyncio.new_event_loop() + self.addCleanup(loop.close) + + task_done = False + + async def foo(): + async def inner(): + nonlocal task_done + try: + await asyncio.sleep(0.2, loop=loop) + finally: + task_done = True + + inner_task = self.new_task(loop, inner()) + + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(inner_task, timeout=0.1, loop=loop) + + self.assertTrue(task_done) + + loop.run_until_complete(foo()) + + def test_wait_for_self_cancellation(self): + loop = asyncio.new_event_loop() + self.addCleanup(loop.close) + + async def foo(): + async def inner(): + try: + await asyncio.sleep(0.3, loop=loop) + except asyncio.CancelledError: + try: + await asyncio.sleep(0.3, loop=loop) + except asyncio.CancelledError: + await asyncio.sleep(0.3, loop=loop) + + return 42 + + inner_task = self.new_task(loop, inner()) + + wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop) + + # Test that wait_for itself is properly cancellable + # even when the initial task holds up the initial cancellation. + task = self.new_task(loop, wait) + await asyncio.sleep(0.2, loop=loop) + task.cancel() + + with self.assertRaises(asyncio.CancelledError): + await task + + self.assertEqual(await inner_task, 42) + + loop.run_until_complete(foo()) + def test_wait(self): def gen(): diff --git a/Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst b/Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst new file mode 100644 index 0000000000..3e27cd461c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst @@ -0,0 +1,2 @@ +When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now +wait until the cancellation is complete. -- 2.40.0