]> granicus.if.org Git - python/commitdiff
Removes an inefficient spin loop in as_completed
authorBrian Quinlan <brian@sweetapp.com>
Wed, 17 Nov 2010 11:06:29 +0000 (11:06 +0000)
committerBrian Quinlan <brian@sweetapp.com>
Wed, 17 Nov 2010 11:06:29 +0000 (11:06 +0000)
Lib/concurrent/futures/_base.py

index 8136d89dc9026afe3ccc2619f269259348fe564b..0bbb85bd462033a42599329ab03d549460f3c967 100644 (file)
@@ -12,6 +12,7 @@ import time
 FIRST_COMPLETED = 'FIRST_COMPLETED'
 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
 ALL_COMPLETED = 'ALL_COMPLETED'
+_AS_COMPLETED = '_AS_COMPLETED'
 
 # Possible future states (for internal use by the futures package).
 PENDING = 'PENDING'
@@ -70,8 +71,30 @@ class _Waiter(object):
     def add_cancelled(self, future):
         self.finished_futures.append(future)
 
+class _AsCompletedWaiter(_Waiter):
+    """Used by as_completed()."""
+
+    def __init__(self):
+        super(_AsCompletedWaiter, self).__init__()
+        self.lock = threading.Lock()
+
+    def add_result(self, future):
+        with self.lock:
+            super(_AsCompletedWaiter, self).add_result(future)
+            self.event.set()
+
+    def add_exception(self, future):
+        with self.lock:
+            super(_AsCompletedWaiter, self).add_exception(future)
+            self.event.set()
+
+    def add_cancelled(self, future):
+        with self.lock:
+            super(_AsCompletedWaiter, self).add_cancelled(future)
+            self.event.set()
+
 class _FirstCompletedWaiter(_Waiter):
-    """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
+    """Used by wait(return_when=FIRST_COMPLETED)."""
 
     def add_result(self, future):
         super().add_result(future)
@@ -128,7 +151,9 @@ class _AcquireFutures(object):
             future._condition.release()
 
 def _create_and_install_waiters(fs, return_when):
-    if return_when == FIRST_COMPLETED:
+    if return_when == _AS_COMPLETED:
+        waiter = _AsCompletedWaiter()
+    elif return_when == FIRST_COMPLETED:
         waiter = _FirstCompletedWaiter()
     else:
         pending_count = sum(
@@ -171,7 +196,7 @@ def as_completed(fs, timeout=None):
                 f for f in fs
                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
         pending = set(fs) - finished
-        waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
+        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
 
     try:
         for future in finished:
@@ -187,11 +212,15 @@ def as_completed(fs, timeout=None):
                             '%d (of %d) futures unfinished' % (
                             len(pending), len(fs)))
 
-            waiter.event.wait(timeout)
+            waiter.event.wait(wait_timeout)
+
+            with waiter.lock:
+                finished = waiter.finished_futures
+                waiter.finished_futures = []
+                waiter.event.clear()
 
-            for future in waiter.finished_futures[:]:
+            for future in finished:
                 yield future
-                waiter.finished_futures.remove(future)
                 pending.remove(future)
 
     finally: