return waiter
-def _yield_and_decref(fs, ref_collect):
+def _yield_finished_futures(fs, waiter, ref_collect):
"""
- Iterate on the list *fs*, yielding objects one by one in reverse order.
- Before yielding an object, it is removed from each set in
- the collection of sets *ref_collect*.
+ Iterate on the list *fs*, yielding finished futures one by one in
+ reverse order.
+ Before yielding a future, *waiter* is removed from its waiters
+ and the future is removed from each set in the collection of sets
+ *ref_collect*.
+
+ The aim of this function is to avoid keeping stale references after
+ the future is yielded and before the iterator resumes.
"""
while fs:
+ f = fs[-1]
for futures_set in ref_collect:
- futures_set.remove(fs[-1])
+ futures_set.remove(f)
+ with f._condition:
+ f._waiters.remove(waiter)
+ del f
# Careful not to keep a reference to the popped value
yield fs.pop()
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
- yield from _yield_and_decref(finished, ref_collect=(fs,))
+ yield from _yield_finished_futures(finished, waiter,
+ ref_collect=(fs,))
while pending:
if timeout is None:
# reverse to keep finishing order
finished.reverse()
- yield from _yield_and_decref(finished, ref_collect=(fs, pending))
+ yield from _yield_finished_futures(finished, waiter,
+ ref_collect=(fs, pending))
finally:
+ # Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
- futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
+ futures_list.append(create_future(state=FINISHED, result=42))
with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):