Returns result of the Future or coroutine. When a timeout occurs, it
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
- cancellation, wrap it in :func:`shield`.
+ cancellation, wrap it in :func:`shield`. The function will wait until
+ the future is actually cancelled, so the total wait time may exceed
+ the *timeout*.
If the wait is cancelled, the future *fut* is also cancelled.
.. versionchanged:: 3.4.3
If the wait is cancelled, the future *fut* is now also cancelled.
+
+ .. versionchanged:: 3.7
+ When *fut* is cancelled due to a timeout, ``wait_for`` now waits
+ for *fut* to be cancelled. Previously,
+ it raised :exc:`~asyncio.TimeoutError` immediately.
return fut.result()
else:
fut.remove_done_callback(cb)
- fut.cancel()
+ # We must ensure that the task is not running
+ # after wait_for() returns.
+ # See https://bugs.python.org/issue32751
+ await _cancel_and_wait(fut, loop=loop)
raise futures.TimeoutError()
finally:
timeout_handle.cancel()
async def _wait(fs, timeout, return_when, loop):
- """Internal helper for wait() and wait_for().
+ """Internal helper for wait().
The fs argument must be a collection of Futures.
"""
return done, pending
+async def _cancel_and_wait(fut, loop):
+ """Cancel the *fut* future or task and wait until it completes."""
+
+ waiter = loop.create_future()
+ cb = functools.partial(_release_waiter, waiter)
+ fut.add_done_callback(cb)
+
+ try:
+ fut.cancel()
+ # We cannot wait on *fut* directly to make
+ # sure _cancel_and_wait itself is reliably cancellable.
+ await waiter
+ finally:
+ fut.remove_done_callback(cb)
+
+
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.
with self.assertRaises(ValueError):
asyncio.Condition(lock, loop=loop)
+ def test_timeout_in_block(self):
+ loop = asyncio.new_event_loop()
+ self.addCleanup(loop.close)
+
+ async def task_timeout():
+ condition = asyncio.Condition(loop=loop)
+ async with condition:
+ with self.assertRaises(asyncio.TimeoutError):
+ await asyncio.wait_for(condition.wait(), timeout=0.5,
+ loop=loop)
+
+ loop.run_until_complete(task_timeout())
+
class SemaphoreTests(test_utils.TestCase):
res = loop.run_until_complete(task)
self.assertEqual(res, "ok")
+ def test_wait_for_waits_for_task_cancellation(self):
+ loop = asyncio.new_event_loop()
+ self.addCleanup(loop.close)
+
+ task_done = False
+
+ async def foo():
+ async def inner():
+ nonlocal task_done
+ try:
+ await asyncio.sleep(0.2, loop=loop)
+ finally:
+ task_done = True
+
+ inner_task = self.new_task(loop, inner())
+
+ with self.assertRaises(asyncio.TimeoutError):
+ await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
+
+ self.assertTrue(task_done)
+
+ loop.run_until_complete(foo())
+
+ def test_wait_for_self_cancellation(self):
+ loop = asyncio.new_event_loop()
+ self.addCleanup(loop.close)
+
+ async def foo():
+ async def inner():
+ try:
+ await asyncio.sleep(0.3, loop=loop)
+ except asyncio.CancelledError:
+ try:
+ await asyncio.sleep(0.3, loop=loop)
+ except asyncio.CancelledError:
+ await asyncio.sleep(0.3, loop=loop)
+
+ return 42
+
+ inner_task = self.new_task(loop, inner())
+
+ wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
+
+ # Test that wait_for itself is properly cancellable
+ # even when the initial task holds up the initial cancellation.
+ task = self.new_task(loop, wait)
+ await asyncio.sleep(0.2, loop=loop)
+ task.cancel()
+
+ with self.assertRaises(asyncio.CancelledError):
+ await task
+
+ self.assertEqual(await inner_task, 42)
+
+ loop.run_until_complete(foo())
+
def test_wait(self):
def gen():
--- /dev/null
+When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now
+wait until the cancellation is complete.