bpo-29595: Expose max_queue_size in ThreadPoolExecutor#143
bpo-29595: Expose max_queue_size in ThreadPoolExecutor#143prayerslayer wants to merge 10 commits into
Conversation
|
Hello, and thanks for your contribution! I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA). Unfortunately we couldn't find an account corresponding to your GitHub username on bugs.python.org (b.p.o) to verify you have signed the CLA. This is necessary for legal reasons before we can look at your contribution. Please follow these steps to help rectify the issue:
Thanks again to your contribution and we look forward to looking at it! |
Sorry, something went wrong.
|
Hi, can you create an issue about this in the issue tracker https://bugs.pyhon.org |
Sorry, something went wrong.
|
I hope I did this right: http://bugs.python.org/issue29595 By the way I signed the CLA 🎉 |
Sorry, something went wrong.
|
Although the keyword |
Sorry, something went wrong.
|
Yes, that's a good point. I'd probably check for not None and > 0. Would you rather silently not set |
Sorry, something went wrong.
|
I'll let the more experienced devs weigh-in on this, but imho 'explicit is better than implicit' and I'd prob raise an exception to alert the client code that an invalid value was passed for a keyword arg. If you fail fast, this kind of a nonsensical argument will most likely be caught early instead of puzzling people when it causes bizarre behaviour in production (due to the implicit setting of keyword args). |
Sorry, something went wrong.
|
Updated the PR and added tests. Should I update the docs somewhere? |
Sorry, something went wrong.
|
Also, should I add the same feature to the |
Sorry, something went wrong.
|
Since you've changed the signature of a class, the relevant docs need the required updates too :-) I'd suggest not changing anything else until a core-dev also comes through and provides a review and an opinion on the change. If one doesn't come around for a while, "ping" the issue on the issue tracker to try and get attention to it. |
Sorry, something went wrong.
|
Updated the docs and waiting for a core dev. |
Sorry, something went wrong.
|
Fixed the build and pinging for authoritative feedback. |
Sorry, something went wrong.
|
Any updates? It's been a while and I also pinged it on the issue tracker to no avail... |
Sorry, something went wrong.
|
If this feature is approved, we should give some thought to making it a keyword-only argument. That would improve code clarity and leave the positional arguments open for future expansion if needed. Also, if we ever need to make a change, keyword arguments are easier to deprecate than positional arguments (if other positional arguments had been added in the interim). |
Sorry, something went wrong.
|
I think this is a nice feature addition overall. |
Sorry, something went wrong.
Yes, that would be reasonable, as it is vulnerable to the same problem. |
Sorry, something went wrong.
|
I think I have everything for the ThreadPoolExecutor, at least. As for the ProcessPoolExecutor it seems like the Regarding multiprocessing.Pool I can mechanically apply the same change, but I have really no idea what's going on there. |
Sorry, something went wrong.
|
I agree |
Sorry, something went wrong.
Stackless contributes two tests to builtins: TaskletExit and TaskletExit.__init__. Therefore we have to adjust the limit. Add missing changelog entries (python#143, python#144).
…on#143 - avoid type punning / strict aliasing violations - don't rely on implementation defined placement of bit-fields in the storage unit.
Stackless contributes two tests to builtins: TaskletExit and TaskletExit.__init__. Therefore we have to adjust the limit. Add missing changelog entries (python#143, python#144). (cherry picked from commit 7327e4b)
|
The recent changes by @pitrou have replaced the After some investigation, one possibility would be to revert to using a For the API change, my only concern is that the @prayerslayer are you still interested on working on this issue? If not, let me know, I am interested in taking over from here. |
Sorry, something went wrong.
Using a RLock does not make a routine signal-safe. The typical situation is:
(you can replace "signal" with "cyclic garbage collection" above for the same effect) In other words, the problem is not the lock, but the operations that are protected by the lock. SimpleQueue was designed specifically for |
Sorry, something went wrong.
Thanks for the explanation, I did not think about it as a signal interruption. Thus, to get this kind of API changes, we need to implement a size mechanism in the |
Sorry, something went wrong.
|
Hi everyone. I stumbled on this pull request after messing around with this concept myself for the last few days. I may not be testing this correctly but here's what I'm doing. from time import time, strftime, sleep, gmtime
from random import randint
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
class ThreadPoolExecutorWithQueueLimit(ThreadPoolExecutor):
def __init__(self, max_queue_size, *args, **kwargs):
super(ThreadPoolExecutorWithQueueLimit, self).__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=max_queue_size)
def nap(nap_length):
sleep(nap_length)
return nap_length
if __name__ == '__main__':
startTime = time()
range_size = 100
max_pool_size = 10
max_worker_count = 100
with ThreadPoolExecutorWithQueueLimit(max_queue_size=max_pool_size,
max_workers=max_worker_count) as pool_executor:
pool = {}
for i in range(range_size):
function_call = pool_executor.submit(nap, randint(0, 2))
pool[function_call] = i
for completed_function in as_completed(pool):
result = completed_function.result()
i = pool[completed_function]
print('{} completed @ {} and slept for {}'.format(
str(i).zfill(4),
strftime("%H:%M:%S", gmtime()),
result))
print('==--- Script took {} seconds. ---=='.format(
round(time() - startTime)))In this case the queue fills with all 100 threads even though I've set it to 10. Am I completely off base with my expectations? What have I done wrong? |
Sorry, something went wrong.
I'm interested in getting this merged somehow, but since I didn't do anything for a year it's probably unrealistic that I'll finish it at all. Please go ahead :) |
Sorry, something went wrong.
|
Hello, I posted a similar patch for I'd like to add my own 2 cents in regards of this feature as @pitrou suggested to implement it in Allowing the user to set a maximum size to the internal queue communicates a false sense of control over the amount of tasks which will be submitted to a The reason behind this flaw is that several jobs are pulled from the internal job queue and shovelled down the As the size of the jobs may vary as well as for the size of the I experienced this issue when releasing A much simpler and explicit way to achieve this goal is by using a Semaphore. This can be easily implemented both for Note that the above mentioned issue is affecting only process based pools, thread pools will actually work as expected. I could easily reproduce the issue with |
Sorry, something went wrong.
|
@noxdafox Using a semaphore sounds reasonable. Feel free to post a PR. |
Sorry, something went wrong.
|
@pitrou the question is do we need to integrate this feature in the pools considering how trivial is to achieve such functionality with few lines of code? IMHO this fits more as a recipe or gist than a core functionality of the pool. I am more concerned about the slow creeping of features leading to a complex class to maintain in the future. |
Sorry, something went wrong.
|
I agree with the maintainability concern. The |
Sorry, something went wrong.
|
I agree that this is a bad idea to change the size of the However, I am unsure of what should be the expected behavior, on two points of this API change:
Overall, this addition seems to also complicate the call to |
Sorry, something went wrong.
|
This gist shows how trivial is for a user to achieve such functionality. There would be several other questions to sort out to properly add this feature to the Pools. What about IMHO the benefit of having this feature built-in is not worth its maintenance. |
Sorry, something went wrong.
- avoid type punning / strict aliasing violations - don't rely on implementation defined placement of bit-fields in the storage unit. (cherry picked from commit 5512131)
|
Since @prayerslayer said "it's probably unrealistic that [he will] finish" this PR, I'm going to close it (obviously people can open their own PRs to implement this functionality if they choose to). |
Sorry, something went wrong.
Hi!
Please forgive this blunt pull request, I wanted to open an issue first and ask, but couldn't.
The situation I ran into recently was that I used
ThreadPoolExecutorto parallelize AWS API calls; I had to move data from one S3 bucket to another (~150M objects). Contrary to what I expected the maximum size of the underlying queue doesn't have a non-zero value by default. Thus my process ended up consuming gigabytes of memory, because it put more items into the queue than the threads were able to work off: The queue just kept growing. (It ran on K8s and the pod was rightfully killed eventually.)Of course there ways to work around this. One could use more threads, to some extent. Or you could use your own queue with a defined maximum size. But I think it's more work for users of Python than necessary.
So this pull request exposes a
max_queue_sizeparameter for theThreadPoolExecutorwhich will be forwarded toqueue.Queue(). It defaults to0, so backward-compatibility is ensured. I am happy to add tests if you'd give me further instructions. I would've done this already, but I'm largely unfamiliar with this project as well as the language 😅I hope you find this as useful as I would and am looking forward to read your thoughts about it!
https://bugs.python.org/issue29595