]> granicus.if.org Git - python/commitdiff
Sync with asyncio repo
authorAndrew Svetlov <andrew.svetlov@gmail.com>
Mon, 11 Jan 2016 06:42:49 +0000 (08:42 +0200)
committerAndrew Svetlov <andrew.svetlov@gmail.com>
Mon, 11 Jan 2016 06:42:49 +0000 (08:42 +0200)
Lib/asyncio/coroutines.py
Lib/asyncio/tasks.py
Lib/test/test_asyncio/test_tasks.py
Misc/NEWS

index 3a92c7d7552219a08c74e35939f20f5de85c6264..27ab42a5bfaf5dde6740bb5cc6a424e6d1dc7639 100644 (file)
@@ -27,8 +27,8 @@ _YIELD_FROM = opcode.opmap['YIELD_FROM']
 # before you define your coroutines.  A downside of using this feature
 # is that tracebacks show entries for the CoroWrapper.__next__ method
 # when _DEBUG is true.
-_DEBUG = (not sys.flags.ignore_environment
-          and bool(os.environ.get('PYTHONASYNCIODEBUG')))
+_DEBUG = (not sys.flags.ignore_environment and
+          bool(os.environ.get('PYTHONASYNCIODEBUG')))
 
 
 try:
@@ -86,7 +86,7 @@ class CoroWrapper:
     def __init__(self, gen, func=None):
         assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
         self.gen = gen
-        self.func = func # Used to unwrap @coroutine decorator
+        self.func = func  # Used to unwrap @coroutine decorator
         self._source_traceback = traceback.extract_stack(sys._getframe(1))
         self.__name__ = getattr(gen, '__name__', None)
         self.__qualname__ = getattr(gen, '__qualname__', None)
@@ -283,10 +283,13 @@ def _format_coroutine(coro):
         coro_frame = coro.cr_frame
 
     filename = coro_code.co_filename
-    if (isinstance(coro, CoroWrapper)
-    and not inspect.isgeneratorfunction(coro.func)
-    and coro.func is not None):
-        filename, lineno = events._get_function_source(coro.func)
+    lineno = 0
+    if (isinstance(coro, CoroWrapper) and
+            not inspect.isgeneratorfunction(coro.func) and
+            coro.func is not None):
+        source = events._get_function_source(coro.func)
+        if source is not None:
+            filename, lineno = source
         if coro_frame is None:
             coro_repr = ('%s done, defined at %s:%s'
                          % (coro_name, filename, lineno))
index a2ab8815b66053a6467a687571705405afd24b4d..3c25e2d27823bc5e96fe432ee07dcaa6a0e4e4de 100644 (file)
@@ -4,6 +4,7 @@ __all__ = ['Task',
            'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
            'wait', 'wait_for', 'as_completed', 'sleep', 'async',
            'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
+           'timeout',
            ]
 
 import concurrent.futures
@@ -732,3 +733,53 @@ def run_coroutine_threadsafe(coro, loop):
 
     loop.call_soon_threadsafe(callback)
     return future
+
+
+def timeout(timeout, *, loop=None):
+    """A factory which produce a context manager with timeout.
+
+    Useful in cases when you want to apply timeout logic around block
+    of code or in cases when asyncio.wait_for is not suitable.
+
+    For example:
+
+    >>> with asyncio.timeout(0.001):
+    >>>     yield from coro()
+
+
+    timeout: timeout value in seconds
+    loop: asyncio compatible event loop
+    """
+    if loop is None:
+        loop = events.get_event_loop()
+    return _Timeout(timeout, loop=loop)
+
+
+class _Timeout:
+    def __init__(self, timeout, *, loop):
+        self._timeout = timeout
+        self._loop = loop
+        self._task = None
+        self._cancelled = False
+        self._cancel_handler = None
+
+    def __enter__(self):
+        self._task = Task.current_task(loop=self._loop)
+        if self._task is None:
+            raise RuntimeError('Timeout context manager should be used '
+                               'inside a task')
+        self._cancel_handler = self._loop.call_later(
+            self._timeout, self._cancel_task)
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        if exc_type is futures.CancelledError and self._cancelled:
+            self._cancel_handler = None
+            self._task = None
+            raise futures.TimeoutError
+        self._cancel_handler.cancel()
+        self._cancel_handler = None
+        self._task = None
+
+    def _cancel_task(self):
+        self._cancelled = self._task.cancel()
index 5ee20f6a2979f8de8d7260a283abde2aab0f5bdc..42e30a4d7638edc5c2f1b5ac7cc6a625af8b068c 100644 (file)
@@ -6,6 +6,7 @@ import io
 import os
 import re
 import sys
+import time
 import types
 import unittest
 import weakref
@@ -2235,5 +2236,173 @@ class SleepTests(test_utils.TestCase):
         self.assertEqual(result, 11)
 
 
+class TimeoutTests(test_utils.TestCase):
+    def setUp(self):
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(None)
+
+    def tearDown(self):
+        self.loop.close()
+        self.loop = None
+
+    def test_timeout(self):
+        canceled_raised = [False]
+
+        @asyncio.coroutine
+        def long_running_task():
+            try:
+                yield from asyncio.sleep(10, loop=self.loop)
+            except asyncio.CancelledError:
+                canceled_raised[0] = True
+                raise
+
+        @asyncio.coroutine
+        def go():
+            with self.assertRaises(asyncio.TimeoutError):
+                with asyncio.timeout(0.01, loop=self.loop) as t:
+                    yield from long_running_task()
+                    self.assertIs(t._loop, self.loop)
+
+        self.loop.run_until_complete(go())
+        self.assertTrue(canceled_raised[0], 'CancelledError was not raised')
+
+    def test_timeout_finish_in_time(self):
+        @asyncio.coroutine
+        def long_running_task():
+            yield from asyncio.sleep(0.01, loop=self.loop)
+            return 'done'
+
+        @asyncio.coroutine
+        def go():
+            with asyncio.timeout(0.1, loop=self.loop):
+                resp = yield from long_running_task()
+            self.assertEqual(resp, 'done')
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_gloabal_loop(self):
+        asyncio.set_event_loop(self.loop)
+
+        @asyncio.coroutine
+        def run():
+            with asyncio.timeout(0.1) as t:
+                yield from asyncio.sleep(0.01)
+                self.assertIs(t._loop, self.loop)
+
+        self.loop.run_until_complete(run())
+
+    def test_timeout_not_relevant_exception(self):
+        @asyncio.coroutine
+        def go():
+            yield from asyncio.sleep(0, loop=self.loop)
+            with self.assertRaises(KeyError):
+                with asyncio.timeout(0.1, loop=self.loop):
+                    raise KeyError
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_canceled_error_is_converted_to_timeout(self):
+        @asyncio.coroutine
+        def go():
+            yield from asyncio.sleep(0, loop=self.loop)
+            with self.assertRaises(asyncio.CancelledError):
+                with asyncio.timeout(0.001, loop=self.loop):
+                    raise asyncio.CancelledError
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_blocking_loop(self):
+        @asyncio.coroutine
+        def long_running_task():
+            time.sleep(0.05)
+            return 'done'
+
+        @asyncio.coroutine
+        def go():
+            with asyncio.timeout(0.01, loop=self.loop):
+                result = yield from long_running_task()
+            self.assertEqual(result, 'done')
+
+        self.loop.run_until_complete(go())
+
+    def test_for_race_conditions(self):
+        fut = asyncio.Future(loop=self.loop)
+        self.loop.call_later(0.1, fut.set_result('done'))
+
+        @asyncio.coroutine
+        def go():
+            with asyncio.timeout(0.2, loop=self.loop):
+                resp = yield from fut
+            self.assertEqual(resp, 'done')
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_time(self):
+        @asyncio.coroutine
+        def go():
+            foo_running = None
+
+            start = self.loop.time()
+            with self.assertRaises(asyncio.TimeoutError):
+                with asyncio.timeout(0.1, loop=self.loop):
+                    foo_running = True
+                    try:
+                        yield from asyncio.sleep(0.2, loop=self.loop)
+                    finally:
+                        foo_running = False
+
+            dt = self.loop.time() - start
+            self.assertTrue(0.09 < dt < 0.11, dt)
+            self.assertFalse(foo_running)
+
+        self.loop.run_until_complete(go())
+
+    def test_raise_runtimeerror_if_no_task(self):
+        with self.assertRaises(RuntimeError):
+            with asyncio.timeout(0.1, loop=self.loop):
+                pass
+
+    def test_outer_coro_is_not_cancelled(self):
+
+        has_timeout = [False]
+
+        @asyncio.coroutine
+        def outer():
+            try:
+                with asyncio.timeout(0.001, loop=self.loop):
+                    yield from asyncio.sleep(1, loop=self.loop)
+            except asyncio.TimeoutError:
+                has_timeout[0] = True
+
+        @asyncio.coroutine
+        def go():
+            task = asyncio.ensure_future(outer(), loop=self.loop)
+            yield from task
+            self.assertTrue(has_timeout[0])
+            self.assertFalse(task.cancelled())
+            self.assertTrue(task.done())
+
+        self.loop.run_until_complete(go())
+
+    def test_cancel_outer_coro(self):
+        fut = asyncio.Future(loop=self.loop)
+
+        @asyncio.coroutine
+        def outer():
+            fut.set_result(None)
+            yield from asyncio.sleep(1, loop=self.loop)
+
+        @asyncio.coroutine
+        def go():
+            task = asyncio.ensure_future(outer(), loop=self.loop)
+            yield from fut
+            task.cancel()
+            with self.assertRaises(asyncio.CancelledError):
+                yield from task
+            self.assertTrue(task.cancelled())
+            self.assertTrue(task.done())
+
+        self.loop.run_until_complete(go())
+
 if __name__ == '__main__':
     unittest.main()
index ae9301a108858a8e067911d930942054c9f192e1..9b677e522c3ea31489b8e39df04aee0a0f08f17d 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -25,6 +25,11 @@ Library
 - Issue #24120: Ignore PermissionError when traversing a tree with
   pathlib.Path.[r]glob().  Patch by Ulrich Petri.
 
+- Skip getaddrinfo if host is already resolved.
+  Patch by A. Jesse Jiryu Davis.
+
+- Add asyncio.timeout() context manager.
+
 
 What's New in Python 3.4.4?
 ===========================