gh-74028: `concurrent.futures.Executor.map`: introduce `buffersize` param for lazier behavior by ebonnal · Pull Request #125663 · python/cpython
Hi @Zheaoli
For me, the basic map API's behavior is when we put an infinite iterator, the result would be infinite and only stop when the iterator has been stoped. I think we need to keep the same behavior between map and executor.map
There may be a misunderstanding here, the goal of this PR is precisely to make Executor.map closer to the builtin map behavior, i.e. make it lazier. (map and current executor.map do not have the same behavior)
I will recap the behaviors so that everybody is on the same page:
built-in map
infinite_iterator = itertools.count(0) # a `map` instance is created and the func and iterable are just stored as attributes mapped_iterator = map(str, infinite_iterator) # retrieves the first element of its input iterator, applies # the transformation and returns the result assert next(mapped_iterator) == "0" # the next element in the input iterator is the 2nd assert next(infinite_iterator) == 1 # one can next infinitely assert next(mapped_iterator) == "2" assert next(mapped_iterator) == "3" assert next(mapped_iterator) == "4" assert next(mapped_iterator) == "5" ...
Executor.map without buffersize (= current Executor.map)
infinite_iterator = itertools.count(0) # this line runs FOREVER, trying to iterate over input iterator until exhaustion mapped_iterator = executor.map(str, infinite_iterator)
⏫ this line will run forever because it collects the entire input iterable eagerly, in order to build the entire future results list fs = [self.submit(fn, *args) for args in zip(*iterables)] which requires infinite time and memory.
Executor.map with buffersize
infinite_iterator = itertools.count(0) # retrieves the first 2 elements (=buffersize) and submits 2 tasks for them mapped_iterator = executor.map(str, infinite_iterator, buffersize=2) # retrieves the 3rd element of input iterator and submits a task for it, # then wait for the oldest future in the buffer to complete and returns the result assert next(mapped_iterator) == "0" # the next element of the input iterator is the 4th assert next(infinite_iterator) == 3 # one can next infinitely while only a buffer of finite not-yet-yielded future results is kept in memory assert next(mapped_iterator) == "1" assert next(mapped_iterator) == "2" assert next(mapped_iterator) == "4" assert next(mapped_iterator) == "5" ...
note
I used the example of an infinite input iterator because this is an example where current Executor.map is just unusable at all. But even for finite input iterables, if a developer writes mapped_iterator = executor.map(fn, iterable), they often don’t want the iterable to be eagerly exhausted right away, but rather to be iterated at the same rate as mapped_iterator. This PR's proposal is to allow them to do so by setting a buffersize.