◐ Shell
reader mode source ↗
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
26 changes: 19 additions & 7 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,24 @@ def _create_and_install_waiters(fs, return_when):
return waiter


def _yield_and_decref(fs, ref_collect):
"""
Iterate on the list *fs*, yielding objects one by one in reverse order.
Before yielding an object, it is removed from each set in
the collection of sets *ref_collect*.
"""
while fs:
for futures_set in ref_collect:
futures_set.remove(fs[-1])
# Careful not to keep a reference to the popped value
yield fs.pop()

Expand Down Expand Up @@ -216,7 +225,8 @@ def as_completed(fs, timeout=None):
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
yield from _yield_and_decref(finished, ref_collect=(fs,))

while pending:
if timeout is None:
@@ -237,9 +247,11 @@ def as_completed(fs, timeout=None):

# reverse to keep finishing order
finished.reverse()
yield from _yield_and_decref(finished, ref_collect=(fs, pending))

finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def test_free_reference_yielded_future(self):
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
futures_list.append(create_future(state=SUCCESSFUL_FUTURE))

with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):
Expand Down
Toggle all file notes Toggle all file annotations