gh-96471: Add asyncio queue shutdown#104228
Conversation
* Include docs
* Queue state enum members are capitalised * Termination state in str/repr * Include raised exception in docstrings * Factor out queue-state checks and updates to methods * Logic fixes in get_nowait and shutdown * Handle queue shutdown in task_done and join * Updated tests * Document feature added in 3.13
|
I assume the "on top of ..." part is obsolete, since that PR was closed without merging. I'll add @willingc as a reviewer per your request. |
Sorry, something went wrong.
Done, but like #104750, we may want to modify the implementation of |
Sorry, something went wrong.
|
Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it? (At least the feature wasn't reverted.) |
Sorry, something went wrong.
I did see the follow-on issue and pull-requests. Yves and I are working on fixing it (you can follow the changes here: main...EpicWink:cpython:fix-thread-queue-shutdown-test) |
Sorry, something went wrong.
|
Awesome! (An acknowledgement that you were working on it would have lessened my stress. :-) |
Sorry, something went wrong.
|
Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that |
Sorry, something went wrong.
|
Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.
Laurie
…________________________________
From: Guido van Rossum ***@***.***>
Sent: Thursday, February 22, 2024 11:00:09 AM
To: python/cpython ***@***.***>
Cc: Laurie O ***@***.***>; Mention ***@***.***>
Subject: Re: [python/cpython] gh-96471: Add asyncio queue shutdown (PR #104228)
Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review of gh-104750<#104750>.)
—
Reply to this email directly, view it on GitHub<#104228 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/AF72GRNKMPJRCRIJHOB4NUDYU2KBTAVCNFSM6AAAAAAXX3PWIGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNJYGQ3TENZYGQ>.
You are receiving this because you were mentioned.Message ID: ***@***.***>
|
Sorry, something went wrong.
And what about |
Sorry, something went wrong.
gvanrossum
left a comment
There was a problem hiding this comment.
I'll wait reviewing until you've rewritten shutdown, but here's one doc markup nit.
Also, please mark PRs that aren't ready for review as Draft.
Sorry, something went wrong.
The behaviour should (and I intend to implement it to) be the same between the threading, multiprocessing and asyncio queues |
Sorry, something went wrong.
|
By the way, should |
Sorry, something went wrong.
|
Sorry, something went wrong.
@gvanrossum please don't trust, have you seen
Oops, sorry. I forgot to hold off my most recent push
No, I think it was because I hadn't updated the implementation to match the new agreed-upon behaviour (this PR was in draft at that time anyway, so I don't think the label was needed).
I should probably add a comment |
Sorry, something went wrong.
|
Sorry, something went wrong.
I have seen and commented |
Sorry, something went wrong.
gvanrossum
left a comment
There was a problem hiding this comment.
Yup! This version looks great. Let me know if you agree -- I've removed the DO-NOT-MERGE label, but I'll wait until you and @YvesDup are happy too.
Sorry, something went wrong.
|
I think it's good to go
I could do this, but I don't think it's worth revoking the PR's approval |
Sorry, something went wrong.
Co-authored-by: Duprat <yduprat@gmail.com>
|
I'd like to add that the async def async_map[T, R](
func: Callable[[T], Awaitable[R]],
iterable: AsyncIterable[T],
*,
limit: int,
maxsize: int = -1,
) -> AsyncIterator[R]:
if maxsize < 0:
maxsize = limit
arguments_queue = Queue[T](maxsize=maxsize)
results_queue = Queue[R](maxsize=maxsize)
async def drain():
async for argument in iterable:
await arguments_queue.put(argument)
arguments_queue.shutdown(immediate=False)
async def worker():
while True:
try:
argument = await arguments_queue.get()
except QueueShutDown:
break
await results_queue.put(await func(argument))
async def background():
async with asyncio.TaskGroup() as background_task_group:
background_task_group.create_task(drain())
for _ in range(limit):
background_task_group.create_task(worker())
results_queue.shutdown(immediate=False)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(background())
while True:
try:
yield await results_queue.get()
except QueueShutDown:
breakI believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited. I know there's some |
Sorry, something went wrong.
asyncio-only changes from #102499 (which supercedes #96474), updated to match the API introduced by #104750
📚 Documentation preview 📚: https://cpython-previews--104228.org.readthedocs.build/