◐ Shell
clean mode source ↗

gh-74028: `concurrent.futures.Executor.map`: introduce `buffersize` param for lazier behavior by ebonnal · Pull Request #125663 · python/cpython

Hi @Zheaoli

For me, the basic map API's behavior is when we put an infinite iterator, the result would be infinite and only stop when the iterator has been stoped. I think we need to keep the same behavior between map and executor.map

There may be a misunderstanding here, the goal of this PR is precisely to make Executor.map closer to the builtin map behavior, i.e. make it lazier. (map and current executor.map do not have the same behavior)

I will recap the behaviors so that everybody is on the same page:

built-in map

infinite_iterator = itertools.count(0)

# a `map` instance is created and the func and iterable are just stored as attributes
mapped_iterator = map(str, infinite_iterator)

# retrieves the first element of its input iterator, applies
# the transformation and returns the result
assert next(mapped_iterator) == "0" 

# the next element in the input iterator is the 2nd
assert next(infinite_iterator) == 1

# one can next infinitely
assert next(mapped_iterator) == "2"
assert next(mapped_iterator) == "3" 
assert next(mapped_iterator) == "4" 
assert next(mapped_iterator) == "5" 
...

Executor.map without buffersize (= current Executor.map)

infinite_iterator = itertools.count(0)

# this line runs FOREVER, trying to iterate over input iterator until exhaustion
mapped_iterator = executor.map(str, infinite_iterator)

⏫ this line will run forever because it collects the entire input iterable eagerly, in order to build the entire future results list fs = [self.submit(fn, *args) for args in zip(*iterables)] which requires infinite time and memory.

Executor.map with buffersize

infinite_iterator = itertools.count(0)

# retrieves the first 2 elements (=buffersize) and submits 2 tasks for them
mapped_iterator = executor.map(str, infinite_iterator, buffersize=2)

# retrieves the 3rd element of input iterator and submits a task for it,
# then wait for the oldest future in the buffer to complete and returns the result
assert next(mapped_iterator) == "0" 

# the next element of the input iterator is the 4th
assert next(infinite_iterator) == 3

# one can next infinitely while only a buffer of finite not-yet-yielded future results is kept in memory
assert next(mapped_iterator) == "1" 
assert next(mapped_iterator) == "2" 
assert next(mapped_iterator) == "4"
assert next(mapped_iterator) == "5" 
...

note

I used the example of an infinite input iterator because this is an example where current Executor.map is just unusable at all. But even for finite input iterables, if a developer writes mapped_iterator = executor.map(fn, iterable), they often don’t want the iterable to be eagerly exhausted right away, but rather to be iterated at the same rate as mapped_iterator. This PR's proposal is to allow them to do so by setting a buffersize.