From: Antoine Pitrou Date: Sun, 3 Sep 2017 13:09:23 +0000 (+0200) Subject: Fix a c.f.as_completed() refleak previously introduced in bpo-27144 (#3270) X-Git-Tag: v3.7.0a1~189 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=2ef37607b7aacb7c750d008b9113fe11f96163c0;p=python Fix a c.f.as_completed() refleak previously introduced in bpo-27144 (#3270) --- diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 88521ae317..70c7b61959 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -171,15 +171,24 @@ def _create_and_install_waiters(fs, return_when): return waiter -def _yield_and_decref(fs, ref_collect): +def _yield_finished_futures(fs, waiter, ref_collect): """ - Iterate on the list *fs*, yielding objects one by one in reverse order. - Before yielding an object, it is removed from each set in - the collection of sets *ref_collect*. + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. """ while fs: + f = fs[-1] for futures_set in ref_collect: - futures_set.remove(fs[-1]) + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f # Careful not to keep a reference to the popped value yield fs.pop() @@ -216,7 +225,8 @@ def as_completed(fs, timeout=None): waiter = _create_and_install_waiters(fs, _AS_COMPLETED) finished = list(finished) try: - yield from _yield_and_decref(finished, ref_collect=(fs,)) + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs,)) while pending: if timeout is None: @@ -237,9 +247,11 @@ def as_completed(fs, timeout=None): # reverse to keep finishing order finished.reverse() - yield from _yield_and_decref(finished, ref_collect=(fs, pending)) + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)) finally: + # Remove waiter from unfinished futures for f in fs: with f._condition: f._waiters.remove(waiter) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index f1226fe709..03f8d1d711 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -405,7 +405,7 @@ class AsCompletedTests: # to finished futures. futures_list = [Future() for _ in range(8)] futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) - futures_list.append(create_future(state=SUCCESSFUL_FUTURE)) + futures_list.append(create_future(state=FINISHED, result=42)) with self.assertRaises(futures.TimeoutError): for future in futures.as_completed(futures_list, timeout=0):