◐ Shell
reader mode source ↗
Skip to content

bpo-29595: Expose max_queue_size in ThreadPoolExecutor#143

Closed
prayerslayer wants to merge 10 commits into
python:masterfrom
prayerslayer:expose-max-queue-size-in-threadpoolexecutor
Closed

bpo-29595: Expose max_queue_size in ThreadPoolExecutor#143
prayerslayer wants to merge 10 commits into
python:masterfrom
prayerslayer:expose-max-queue-size-in-threadpoolexecutor

Conversation

@prayerslayer

@prayerslayer prayerslayer commented Feb 17, 2017

Copy link
Copy Markdown

Hi!

Please forgive this blunt pull request, I wanted to open an issue first and ask, but couldn't.

The situation I ran into recently was that I used ThreadPoolExecutor to parallelize AWS API calls; I had to move data from one S3 bucket to another (~150M objects). Contrary to what I expected the maximum size of the underlying queue doesn't have a non-zero value by default. Thus my process ended up consuming gigabytes of memory, because it put more items into the queue than the threads were able to work off: The queue just kept growing. (It ran on K8s and the pod was rightfully killed eventually.)

Of course there ways to work around this. One could use more threads, to some extent. Or you could use your own queue with a defined maximum size. But I think it's more work for users of Python than necessary.

So this pull request exposes a max_queue_size parameter for the ThreadPoolExecutor which will be forwarded to queue.Queue(). It defaults to 0, so backward-compatibility is ensured. I am happy to add tests if you'd give me further instructions. I would've done this already, but I'm largely unfamiliar with this project as well as the language 😅

I hope you find this as useful as I would and am looking forward to read your thoughts about it!

https://bugs.python.org/issue29595

@the-knights-who-say-ni

Copy link
Copy Markdown

Hello, and thanks for your contribution!

I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA).

Unfortunately we couldn't find an account corresponding to your GitHub username on bugs.python.org (b.p.o) to verify you have signed the CLA. This is necessary for legal reasons before we can look at your contribution. Please follow these steps to help rectify the issue:

  1. If you don't have an account on b.p.o, please create one
  2. Make sure your GitHub username is listed in "Your Details" at b.p.o
  3. If you have not already done so, please sign the PSF contributor agreement
  4. If you just signed the CLA, please wait at least one US business day and then check "Your Details" on bugs.python.org to see if your account has been marked as having signed the CLA (the delay is due to a person having to manually check your signed CLA)
  5. Reply here saying you have completed the above steps

Thanks again to your contribution and we look forward to looking at it!

@Mariatta

Copy link
Copy Markdown
Member

Hi, can you create an issue about this in the issue tracker https://bugs.pyhon.org
Thanks.

@prayerslayer

prayerslayer commented Feb 17, 2017

Copy link
Copy Markdown
Author

I hope I did this right: http://bugs.python.org/issue29595

By the way I signed the CLA 🎉

@prayerslayer prayerslayer changed the title Expose max_queue_size in ThreadPoolExecutor Feb 17, 2017
@Winterflower

Copy link
Copy Markdown

Although the keyword max_queue_size is rather self-explanatory, I wonder if there should be some simple checks to make sure no nonsensible values such as None etc are passed to the Queue constructor.

@prayerslayer

Copy link
Copy Markdown
Author

Yes, that's a good point. I'd probably check for not None and > 0. Would you rather silently not set max_queue_size or raise an exception?

@Winterflower

Winterflower commented Feb 17, 2017

Copy link
Copy Markdown

I'll let the more experienced devs weigh-in on this, but imho 'explicit is better than implicit' and I'd prob raise an exception to alert the client code that an invalid value was passed for a keyword arg. If you fail fast, this kind of a nonsensical argument will most likely be caught early instead of puzzling people when it causes bizarre behaviour in production (due to the implicit setting of keyword args).
I haven't looked at the tests for this, but I suppose if you wanted to somehow test that your Queue is being set with the correct size, you can create a ThreadPoolExecutor class, pass in a max_queue_size arg, then grab the self._work_queue and assert that the maxsize property on that is equal to what you passed in for max_queue_size.

@prayerslayer

Copy link
Copy Markdown
Author

Updated the PR and added tests. Should I update the docs somewhere?

@prayerslayer

Copy link
Copy Markdown
Author

Also, should I add the same feature to the ProcessPoolExecutor?

@DimitrisJim

Copy link
Copy Markdown
Contributor

Since you've changed the signature of a class, the relevant docs need the required updates too :-)

I'd suggest not changing anything else until a core-dev also comes through and provides a review and an opinion on the change. If one doesn't come around for a while, "ping" the issue on the issue tracker to try and get attention to it.

@prayerslayer

Copy link
Copy Markdown
Author

Updated the docs and waiting for a core dev.

@prayerslayer

Copy link
Copy Markdown
Author

Fixed the build and pinging for authoritative feedback.

@prayerslayer

Copy link
Copy Markdown
Author

Any updates? It's been a while and I also pinged it on the issue tracker to no avail...

@rhettinger

Copy link
Copy Markdown
Contributor

If this feature is approved, we should give some thought to making it a keyword-only argument. That would improve code clarity and leave the positional arguments open for future expansion if needed. Also, if we ever need to make a change, keyword arguments are easier to deprecate than positional arguments (if other positional arguments had been added in the interim).

@pitrou

pitrou commented Apr 1, 2017

Copy link
Copy Markdown
Member

I think this is a nice feature addition overall.

@pitrou

pitrou commented Apr 1, 2017

Copy link
Copy Markdown
Member

Also, should I add the same feature to the ProcessPoolExecutor?

Yes, that would be reasonable, as it is vulnerable to the same problem.

14 hidden items Load more…
@prayerslayer

Copy link
Copy Markdown
Author

I think I have everything for the ThreadPoolExecutor, at least.

As for the ProcessPoolExecutor it seems like the work_ids queue would be susceptible to growing too big, can fix that later.

Regarding multiprocessing.Pool I can mechanically apply the same change, but I have really no idea what's going on there.

@pitrou

pitrou commented Oct 31, 2017

Copy link
Copy Markdown
Member

I agree multiprocessing.Pool doesn't need to be tackled here. On the other hand, I'd like to see ProcessPoolExecutor updated, for consistency reasons and as we could then re-use the test case.

akruis pushed a commit to akruis/cpython that referenced this pull request Nov 7, 2017
…on#143

- avoid type punning / strict aliasing violations
- don't rely on implementation defined placement of bit-fields in the
storage unit.
(cherry picked from commit b9c243a)
akruis pushed a commit to akruis/cpython that referenced this pull request Dec 20, 2017
Stackless contributes two tests to builtins: TaskletExit and
TaskletExit.__init__. Therefore we have to adjust the limit.

Add missing changelog entries (python#143, python#144).
akruis pushed a commit to akruis/cpython that referenced this pull request Mar 25, 2018
…on#143

- avoid type punning / strict aliasing violations
- don't rely on implementation defined placement of bit-fields in the
storage unit.
akruis pushed a commit to akruis/cpython that referenced this pull request Mar 25, 2018
Stackless contributes two tests to builtins: TaskletExit and
TaskletExit.__init__. Therefore we have to adjust the limit.

Add missing changelog entries (python#143, python#144).

(cherry picked from commit 7327e4b)
@tomMoral

tomMoral commented Apr 2, 2018

Copy link
Copy Markdown
Contributor

The recent changes by @pitrou have replaced the Queue with a SimpleQueue which is unbounded so this PR is conflicting with master. So the change is not simple.

After some investigation, one possibility would be to revert to using a queue.Queue and modify the underlying mutex to be a Rlock. This way, the gc could still be run while the lock is acquired without troubles. This change does not seem to be breaking the API of the queue.Queue, as the internal state of the queue would still be updated atomically in one thread (the gc being only allow in this same thread). What do you think about that? We could also add a mutex parameter to the Queue to avoid changing the behavior every where but this means that the same gc issue can occurs everywhere.

For the API change, my only concern is that the submit function become a blocking function in this case and that we should probably also implement some mechanisms to return in the case the queue is Full. This makes the code more complex.

@prayerslayer are you still interested on working on this issue? If not, let me know, I am interested in taking over from here.

@pitrou

pitrou commented Apr 2, 2018

Copy link
Copy Markdown
Member

After some investigation, one possibility would be to revert to using a queue.Queue and modify the underlying mutex to be a Rlock. This way, the gc could still be run while the lock is acquired without troubles.

Using a RLock does not make a routine signal-safe. The typical situation is:

  • you enter a function protected by a RLock (e.g. Queue.put); the function takes the RLock and starts modifying some internal state
  • a signal arrives and interrupts code execution
  • the signal handler, through some chain of events, re-enters the RLock protected function (in this case: the signal handler triggers something that puts something on the queue)
  • the protected function takes the RLock again; it sees the internal state in a half-modified inconsistent state: who knows what can happen?

(you can replace "signal" with "cyclic garbage collection" above for the same effect)

In other words, the problem is not the lock, but the operations that are protected by the lock.

SimpleQueue was designed specifically for SimpleQueue.put to be safe in that situation. Queue probably isn't, especially with a non-zero maxsize.

@tomMoral

tomMoral commented Apr 2, 2018

Copy link
Copy Markdown
Contributor

you can replace "signal" with "cyclic garbage collection" above for the same effect

Thanks for the explanation, I did not think about it as a signal interruption.

Thus, to get this kind of API changes, we need to implement a size mechanism in the Executor.

@GollyJer

GollyJer commented Apr 3, 2018

Copy link
Copy Markdown

Hi everyone. I stumbled on this pull request after messing around with this concept myself for the last few days.

I may not be testing this correctly but here's what I'm doing.

from time import time, strftime, sleep, gmtime
from random import randint
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue


class ThreadPoolExecutorWithQueueLimit(ThreadPoolExecutor):
    def __init__(self, max_queue_size, *args, **kwargs):
        super(ThreadPoolExecutorWithQueueLimit, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(maxsize=max_queue_size)


def nap(nap_length):
    sleep(nap_length)
    return nap_length


if __name__ == '__main__':

    startTime = time()

    range_size = 100
    max_pool_size = 10
    max_worker_count = 100

    with ThreadPoolExecutorWithQueueLimit(max_queue_size=max_pool_size,
                                          max_workers=max_worker_count) as pool_executor:
        pool = {}
        for i in range(range_size):
            function_call = pool_executor.submit(nap, randint(0, 2))
            pool[function_call] = i

        for completed_function in as_completed(pool):
            result = completed_function.result()
            i = pool[completed_function]

            print('{} completed @ {} and slept for {}'.format(
                str(i).zfill(4),
                strftime("%H:%M:%S", gmtime()),
                result))

    print('==--- Script took {} seconds. ---=='.format(
        round(time() - startTime)))

In this case the queue fills with all 100 threads even though I've set it to 10.
I expect the queue to fill to 10 and as a thread completes it prints to the console and the queue fills back to it's max.

Am I completely off base with my expectations? What have I done wrong?
Thanks!

@prayerslayer

Copy link
Copy Markdown
Author

@tomMoral: are you still interested on working on this issue? If not, let me know, I am interested in taking over from here.

I'm interested in getting this merged somehow, but since I didn't do anything for a year it's probably unrealistic that I'll finish it at all. Please go ahead :)

@noxdafox

noxdafox commented Apr 13, 2018

Copy link
Copy Markdown

Hello,

I posted a similar patch for multiprocessing.Pool long ago: https://bugs.python.org/issue19173 but it never got attention. Please let me know if I shall rework it to add it to multiprocessing.Pool as well.

I'd like to add my own 2 cents in regards of this feature as @pitrou suggested to implement it in ProcessPoolExecutor as well.

Allowing the user to set a maximum size to the internal queue communicates a false sense of control over the amount of tasks which will be submitted to a ProcessPoolExecutor.
The expectation is that, if I set the maximum size to the internal job queue to 10, then the eleventh call to submit will block. Most of the users will be surprised when they realize this is not the case.

The reason behind this flaw is that several jobs are pulled from the internal job queue and shovelled down the Pipe or SimpleQueue (or whatever the implementation uses) by a Thread. The design is such to ensure acceptable performance and reduce IPC latency cost.

As the size of the jobs may vary as well as for the size of the Pipe, we have no way to control how many jobs will fit into it. Hence there is not a simple and consistent way to ensure the promised behaviour to the user.

I experienced this issue when releasing pebble 2.0. The pool allowed the user to provide his/her own implementation of queue.Queue. I've been contacted by few users which experienced unpredictable behaviour when providing a queue with a maxsize being set. After few design iterations I realised that there was not a simple and elegant way to provide such feature and I removed it when moving to the 3.0 release.

A much simpler and explicit way to achieve this goal is by using a Semaphore. This can be easily implemented both for multiprocessing.Pool and for concurrent.future.ProcessPoolExecutor.

Note that the above mentioned issue is affecting only process based pools, thread pools will actually work as expected. I could easily reproduce the issue with multiprocessing.Pool at the time. I am not sure about concurrent.futures.ProcessPoolExecutor as I am not aware of the current status of its internals. It changed a lot since the last time I looked at it.

@pitrou

pitrou commented Apr 13, 2018

Copy link
Copy Markdown
Member

@noxdafox Using a semaphore sounds reasonable. Feel free to post a PR.

@noxdafox

Copy link
Copy Markdown

@pitrou the question is do we need to integrate this feature in the pools considering how trivial is to achieve such functionality with few lines of code?

IMHO this fits more as a recipe or gist than a core functionality of the pool.

I am more concerned about the slow creeping of features leading to a complex class to maintain in the future.

@pitrou

pitrou commented Apr 13, 2018

Copy link
Copy Markdown
Member

I agree with the maintainability concern. The concurrent.futures code has grown more complicated lately as it became more robust against various classes of errors.

@tomMoral

Copy link
Copy Markdown
Contributor

I agree that this is a bad idea to change the size of the call_queue, as it can lead to Executor and Pool starving and it also increases the probability of having deadlocks on shutdown because of the sentinels. Using a threading.BoundedSemaphore permits to decouple this mechanism from the internals of the Executor/Pool and it is fairly easy to implement. I can do it for concurrent.futures.

However, I am unsure of what should be the expected behavior, on two points of this API change:

  • Should we limit the number of pending tasks (in the queue and being processed) or the number of tasks that are not being processed (as it was the original proposition in bpo-29595).
  • Should the call to Executor.submit or Pool.apply_async be blocking? If so, should we add a timeout parameter when the number of tasks is too high?

Overall, this addition seems to also complicate the call to submit for the users and I am not sure it is worth it.

@noxdafox

Copy link
Copy Markdown

This gist shows how trivial is for a user to achieve such functionality.

There would be several other questions to sort out to properly add this feature to the Pools. What about map for example? How do we count a task? A single submission or the length of the iterable? What about the chunksize?

IMHO the benefit of having this feature built-in is not worth its maintenance.

akruis pushed a commit to akruis/cpython that referenced this pull request Jun 19, 2018
- avoid type punning / strict aliasing violations
- don't rely on implementation defined placement of bit-fields in the
storage unit.

(cherry picked from commit 5512131)
@brettcannon

Copy link
Copy Markdown
Member

Since @prayerslayer said "it's probably unrealistic that [he will] finish" this PR, I'm going to close it (obviously people can open their own PRs to implement this functionality if they choose to).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.