]> granicus.if.org Git - python/commitdiff
bpo-32355: Optimize asyncio.gather() (#4913)
authorYury Selivanov <yury@magic.io>
Tue, 19 Dec 2017 12:19:53 +0000 (07:19 -0500)
committerGitHub <noreply@github.com>
Tue, 19 Dec 2017 12:19:53 +0000 (07:19 -0500)
Lib/asyncio/base_events.py
Lib/asyncio/tasks.py
Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst [new file with mode: 0644]

index bd5bb32302a3f392ddbe6d33db533b23584d05c6..a7f8edd8cfd478342602e34c472aba708d328dcd 100644 (file)
@@ -139,11 +139,12 @@ def _ipaddr_info(host, port, family, type, proto):
 
 
 def _run_until_complete_cb(fut):
-    exc = fut._exception
-    if isinstance(exc, BaseException) and not isinstance(exc, Exception):
-        # Issue #22429: run_forever() already finished, no need to
-        # stop it.
-        return
+    if not fut.cancelled():
+        exc = fut.exception()
+        if isinstance(exc, BaseException) and not isinstance(exc, Exception):
+            # Issue #22429: run_forever() already finished, no need to
+            # stop it.
+            return
     fut._loop.stop()
 
 
index 275141c65e7e224edbd61a9f5fc26d4882452db3..ff8a486b544c942f8a5f49fc1bd2139d086d2b3b 100644 (file)
@@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future):
 
 
 def gather(*coros_or_futures, loop=None, return_exceptions=False):
-    """Return a future aggregating results from the given coroutines
-    or futures.
+    """Return a future aggregating results from the given coroutines/futures.
 
     Coroutines will be wrapped in a future and scheduled in the event
     loop. They will not necessarily be scheduled in the same order as
@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
         outer.set_result([])
         return outer
 
-    arg_to_fut = {}
-    for arg in set(coros_or_futures):
-        if not futures.isfuture(arg):
-            fut = ensure_future(arg, loop=loop)
-            if loop is None:
-                loop = fut._loop
-            # The caller cannot control this future, the "destroy pending task"
-            # warning should not be emitted.
-            fut._log_destroy_pending = False
-        else:
-            fut = arg
-            if loop is None:
-                loop = fut._loop
-            elif fut._loop is not loop:
-                raise ValueError("futures are tied to different event loops")
-        arg_to_fut[arg] = fut
-
-    children = [arg_to_fut[arg] for arg in coros_or_futures]
-    nchildren = len(children)
-    outer = _GatheringFuture(children, loop=loop)
-    nfinished = 0
-    results = [None] * nchildren
-
-    def _done_callback(i, fut):
+    def _done_callback(fut):
         nonlocal nfinished
+        nfinished += 1
+
         if outer.done():
             if not fut.cancelled():
                 # Mark exception retrieved.
                 fut.exception()
             return
 
-        if fut.cancelled():
-            res = futures.CancelledError()
-            if not return_exceptions:
-                outer.set_exception(res)
-                return
-        elif fut._exception is not None:
-            res = fut.exception()  # Mark exception retrieved.
-            if not return_exceptions:
-                outer.set_exception(res)
+        if not return_exceptions:
+            if fut.cancelled():
+                # Check if 'fut' is cancelled first, as
+                # 'fut.exception()' will *raise* a CancelledError
+                # instead of returning it.
+                exc = futures.CancelledError()
+                outer.set_exception(exc)
                 return
-        else:
-            res = fut._result
-        results[i] = res
-        nfinished += 1
-        if nfinished == nchildren:
+            else:
+                exc = fut.exception()
+                if exc is not None:
+                    outer.set_exception(exc)
+                    return
+
+        if nfinished == nfuts:
+            # All futures are done; create a list of results
+            # and set it to the 'outer' future.
+            results = []
+
+            for fut in children:
+                if fut.cancelled():
+                    # Check if 'fut' is cancelled first, as
+                    # 'fut.exception()' will *raise* a CancelledError
+                    # instead of returning it.
+                    res = futures.CancelledError()
+                else:
+                    res = fut.exception()
+                    if res is None:
+                        res = fut.result()
+                results.append(res)
+
             outer.set_result(results)
 
-    for i, fut in enumerate(children):
-        fut.add_done_callback(functools.partial(_done_callback, i))
+    arg_to_fut = {}
+    children = []
+    nfuts = 0
+    nfinished = 0
+    for arg in coros_or_futures:
+        if arg not in arg_to_fut:
+            fut = ensure_future(arg, loop=loop)
+            if loop is None:
+                loop = fut._loop
+            if fut is not arg:
+                # 'arg' was not a Future, therefore, 'fut' is a new
+                # Future created specifically for 'arg'.  Since the caller
+                # can't control it, disable the "destroy pending task"
+                # warning.
+                fut._log_destroy_pending = False
+
+            nfuts += 1
+            arg_to_fut[arg] = fut
+            fut.add_done_callback(_done_callback)
+
+        else:
+            # There's a duplicate Future object in coros_or_futures.
+            fut = arg_to_fut[arg]
+
+        children.append(fut)
+
+    outer = _GatheringFuture(children, loop=loop)
     return outer
 
 
diff --git a/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst b/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst
new file mode 100644 (file)
index 0000000..ca908e9
--- /dev/null
@@ -0,0 +1 @@
+Optimize asyncio.gather(); now up to 15% faster.