]> granicus.if.org Git - python/commitdiff
asyncio: Change as_completed() to use a Queue, to avoid O(N**2) behavior. Fixes issue...
authorGuido van Rossum <guido@python.org>
Thu, 13 Feb 2014 01:58:19 +0000 (17:58 -0800)
committerGuido van Rossum <guido@python.org>
Thu, 13 Feb 2014 01:58:19 +0000 (17:58 -0800)
Lib/asyncio/tasks.py
Lib/test/test_asyncio/test_tasks.py

index 81a125f44d77f96d5dc9d063011bbb376012cbf6..b7ee758d64081b28c8c6c8bc2dcd9cc85770f3d0 100644 (file)
@@ -463,7 +463,11 @@ def _wait(fs, timeout, return_when, loop):
 
 # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
 def as_completed(fs, *, loop=None, timeout=None):
-    """Return an iterator whose values, when waited for, are Futures.
+    """Return an iterator whose values are coroutines.
+
+    When waiting for the yielded coroutines you'll get the results (or
+    exceptions!) of the original Futures (or coroutines), in the order
+    in which and as soon as they complete.
 
     This differs from PEP 3148; the proper way to use this is:
 
@@ -471,8 +475,8 @@ def as_completed(fs, *, loop=None, timeout=None):
             result = yield from f  # The 'yield from' may raise.
             # Use result.
 
-    Raises TimeoutError if the timeout occurs before all Futures are
-    done.
+    If a timeout is specified, the 'yield from' will raise
+    TimeoutError when the timeout occurs before all Futures are done.
 
     Note: The futures 'f' are not necessarily members of fs.
     """
@@ -481,27 +485,36 @@ def as_completed(fs, *, loop=None, timeout=None):
     loop = loop if loop is not None else events.get_event_loop()
     deadline = None if timeout is None else loop.time() + timeout
     todo = {async(f, loop=loop) for f in set(fs)}
-    completed = collections.deque()
+    from .queues import Queue  # Import here to avoid circular import problem.
+    done = Queue(loop=loop)
+    timeout_handle = None
+
+    def _on_timeout():
+        for f in todo:
+            f.remove_done_callback(_on_completion)
+            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
+        todo.clear()  # Can't do todo.remove(f) in the loop.
+
+    def _on_completion(f):
+        if not todo:
+            return  # _on_timeout() was here first.
+        todo.remove(f)
+        done.put_nowait(f)
+        if not todo and timeout_handle is not None:
+            timeout_handle.cancel()
 
     @coroutine
     def _wait_for_one():
-        while not completed:
-            timeout = None
-            if deadline is not None:
-                timeout = deadline - loop.time()
-                if timeout < 0:
-                    raise futures.TimeoutError()
-            done, pending = yield from _wait(
-                todo, timeout, FIRST_COMPLETED, loop)
-            # Multiple callers might be waiting for the same events
-            # and getting the same outcome.  Dedupe by updating todo.
-            for f in done:
-                if f in todo:
-                    todo.remove(f)
-                    completed.append(f)
-        f = completed.popleft()
-        return f.result()  # May raise.
+        f = yield from done.get()
+        if f is None:
+            # Dummy value from _on_timeout().
+            raise futures.TimeoutError
+        return f.result()  # May raise f.exception().
 
+    for f in todo:
+        f.add_done_callback(_on_completion)
+    if todo and timeout is not None:
+        timeout_handle = loop.call_later(timeout, _on_timeout)
     for _ in range(len(todo)):
         yield _wait_for_one()
 
index 6847de04712945aa879c8497be1fe822ca1fc644..024dd2ead51531418dcb981de73a79e0cd7c57fa 100644 (file)
@@ -779,7 +779,6 @@ class TaskTests(unittest.TestCase):
             yield 0
             yield 0
             yield 0.1
-            yield 0.02
 
         loop = test_utils.TestLoop(gen)
         self.addCleanup(loop.close)
@@ -791,6 +790,8 @@ class TaskTests(unittest.TestCase):
         def foo():
             values = []
             for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
+                if values:
+                    loop.advance_time(0.02)
                 try:
                     v = yield from f
                     values.append((1, v))
@@ -809,6 +810,26 @@ class TaskTests(unittest.TestCase):
         loop.advance_time(10)
         loop.run_until_complete(asyncio.wait([a, b], loop=loop))
 
+    def test_as_completed_with_unused_timeout(self):
+
+        def gen():
+            yield
+            yield 0
+            yield 0.01
+
+        loop = test_utils.TestLoop(gen)
+        self.addCleanup(loop.close)
+
+        a = asyncio.sleep(0.01, 'a', loop=loop)
+
+        @asyncio.coroutine
+        def foo():
+            for f in asyncio.as_completed([a], timeout=1, loop=loop):
+                v = yield from f
+                self.assertEqual(v, 'a')
+
+        res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
+
     def test_as_completed_reverse_wait(self):
 
         def gen():