gh-131466: `concurrent.futures.Executor.map`: avoid temporarily exceeding `buffersize` while collecting the next result by ebonnal · Pull Request #131467 · python/cpython
Context recap:
If we have:
results: Iterator = executor.map(fn, iterable, buffersize=buffersize)
What happens when calling next(results):
- fetch the next
argfrominterableand put a task forfn(arg)in the buffer - wait for the next result to be available
- yield the collected result
-> During step 2. there is buffersize + 1 buffered tasks.
This PR swaps steps 1. and 2. so that buffersize is never exceeded, even during next.
ebonnal
changed the title
gh-131466:
gh-131466: concurrent.futures.Executor.map: avoid temporarily exceeding buffersize while collecting the next resultconcurrent.futures.Executor.map: avoid temporarily exceeding buffersize while collecting the next result
| yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) | ||
|
|
||
| # Yield the awaited result | ||
| yield fs.pop().result() |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be discussed: this could be replaced by a lighter yield fs.pop()._result because the prior call to _result_or_cancel guarantees that at this point the result is available.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand that we could possibly exceed buffersize while collecting the next result, is there a real-word use case where it would really cause an issue? the reason is that we access to fs[-1] and then do fs.pop().
I see that have a del fut in _result_or_cancel() but can you confirm that it's sufficient to not hold any reference to the yet-to-be-popped future?
@picnixz sorry I re-asked your review because you made me realize that we actually don't need _result_or_cancel anymore:
test_executor_map_current_future_cancel introduced in #95169 does not break anymore because now if the fs[-1].result() access fails, the future is still in fs (not popped out like before) and it will be properly cancelled as part of the result_iterator's finally block.
I'm digging deeper into #95169 's context to check if I miss any non-tested scenario, especially regarding this:
finally:
# Break a reference cycle with the exception in self._exception
del fut
especially regarding this:
yes, that's what I wanted to ask, but I'm not an expert here so i'll let you investigate first c:
@ebonnal Sorry for the late reply. What about this simpler and IMHO cleaner way below? The second to last line may be a bit controversial (it changes the type of a variable), but I've used that list-pop trick in my mpi4py.futures module to avoid keeping references to objects.
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d98b1ebdd58..de34b86d1ee 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -625,21 +625,26 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # before the first iterator value is required. def result_iterator(): try: + result = None # reverse to keep finishing order fs.reverse() while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + result = _result_or_cancel(fs.pop()) + else: + result = _result_or_cancel(fs.pop(), end_time - time.monotonic()) if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) - # Careful not to keep a reference to the popped future - if timeout is None: - yield _result_or_cancel(fs.pop()) - else: - yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) + # Careful not to keep a reference to the result + result = [result] + yield result.pop() finally: + del result for future in fs: future.cancel() return result_iterator()
Thank you for taking a look @dalcinl !
What about this simpler and IMHO cleaner way below?
What exactly do you find unclean in my proposal and justifying a list creation for each yielded element? (fun trick though!)
What exactly do you find unclean in my proposal and justifying a list creation for each yielded element
I guess it is just a matter of subjective taste, my patch looks slightly shorter, but I should say that the primary motivation was avoiding the use of the (conventionally) private _result attribute. The creation of a list with one element is as fast as an attribute lookup, so you can hardly notice any overhead because of it.
I'm biased, as I maintain an custom implementation of this routine, and I prefer to avoid the use of private APIs and attributes. Standard library modules may not be bound to such constraints.
Long story short, I believe both your proposal and mine are functionally equivalent, so FWIW, this PR has my +1.
the primary motivation was avoiding the use of the (conventionally) private _result attribute
I thought that was acceptable because Future and Executor are defined in the same module (_base.py) and I found a lot of other example in the std lib where private attributes are considered more as "module private" rather than "class private" 🤔.
The creation of a list with one element is as fast as an attribute lookup, so you can hardly notice any overhead because of it
Fair, actually I remember now that another alternative I had considered in the early days of this PR was:
result = deque()
while fs:
...
result.append(fs.pop().result())
...
yield result.pop()
Which is similar to your approach but reuses the same container (a deque for the append/pop performance).
I believe both your proposal and mine are functionally equivalent, so FWIW, this PR has my +1.
Thanks again for your review, I appreciate it, let's wait and gather more feedback 👀 !
Fair, actually I remember now that another alternative I had considered in the early days of this PR was:
I looks even better!! I'll borrow your approach for my own code. I you ever update this PR, please do not forget the del result or result.clear() in the finally block.