gh-113538: Allow client connections to be closed#114432
Conversation
Give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work.
|
PR includes unit tests, but I also used this more complete example to test and showcase the functionality: #!/usr/bin/python3
import asyncio
import socket
# Well behaved or spammy protocol?
FLOOD = True
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
self._paused = False
self._write()
def pause_writing(self):
self._paused = True
def resume_writing(self):
self._paused = False
self._write()
def _write(self):
if not FLOOD:
return
if self._paused:
return
self.transport.write(b'a' * 65536)
asyncio.get_running_loop().call_soon(self._write)
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
def stop():
loop = asyncio.get_running_loop()
loop.stop()
client = None
def server_up(fut):
global server
server = fut.result()
print('Server running')
global client
client = socket.create_connection(('127.0.0.1', 8888))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
fut = asyncio.ensure_future(loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888))
fut.add_done_callback(server_up)
# Simulate termination
loop.call_later(2.5, stop)
try:
loop.run_forever()
finally:
print('Closing...')
server.close()
try:
loop.run_until_complete(asyncio.wait_for(server.wait_closed(), 0.1))
except TimeoutError:
print('Timeout waiting for clients. Attempting close...')
server.close_clients()
try:
loop.run_until_complete(asyncio.wait_for(server.wait_closed(), 0.1))
except TimeoutError:
print('Timeout waiting for client close. Attempting abort...')
server.abort_clients()
loop.run_until_complete(asyncio.wait_for(server.wait_closed(), 0.1))
loop.close()
if client is not None:
client.close()
print('Closed') |
Sorry, something went wrong.
gvanrossum
left a comment
There was a problem hiding this comment.
This looks great, but I have one reservation. The server now keeps the transports alive, through the set of clients. Maybe it would be better if that was a weak set, so that if somehow a transport was freed without going through the usual close routine, the server doesn't hold onto it forever? Thoughts?
Sorry, something went wrong.
|
I figured I'd start things simple, so no weak references unless there is a practical issue that requires them. So, are there any scenarios where this is relevant? The transport detaches from the server in the same place where it closes the server. That should mean that this is something that must be done, as otherwise the destructor will complain. So I don't think well-written code should have an issue. Might there be an issue with badly written code, then? The primary case would be if the destructor check fails to trigger. Which I guess is the case with my suggestion. It is difficult to trigger, though. Streams are a bit buggy, so abandoning them doesn't always result in a cleanup (#114914), and transports are referenced by the event loop until you tell them to pause reading. But difficult isn't impossible, so this case breaks when applying this PR: #!/usr/bin/python3
import asyncio
import socket
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
self.transport.pause_reading()
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
client = None
def server_up(fut):
global server
server = fut.result()
print('Server running')
global client
client = socket.create_connection(('127.0.0.1', 8888))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
fut = asyncio.ensure_future(loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888))
fut.add_done_callback(server_up)
# Simulate termination
loop.call_later(2.5, loop.stop)
try:
loop.run_forever()
import gc
gc.collect()
finally:
server.close()
loop.close()
if client is not None:
client.close()This case creates a transport, pauses it, and then fails to maintain any other reference to it. Without this PR it gets collected when So a weak reference does indeed seem appropriate. |
Sorry, something went wrong.
We want to be able to detect if the application fails to keep track of the transports, so we cannot keep them alive by using a hard reference.
The application might be waiting for all transports to close, so we need to properly inform the server that this transport is done.
|
Sorry for the delay -- this just made it to the top of my review queue. I hope to get to this soon! |
Sorry, something went wrong.
gvanrossum
left a comment
There was a problem hiding this comment.
Not bad, some thoughts.
Sorry, something went wrong.
One could be made clearar, and the other is probably superfluous.
Try to get the streams and the kernel in to a more deterministic state by specifying fixed buffering limits.
gvanrossum
left a comment
There was a problem hiding this comment.
Thanks for fixing all that. I have one remaining concern only.
Sorry, something went wrong.
No possibly infinite loop. Instead ask the system how much buffer space it has and fill that.
gvanrossum
left a comment
There was a problem hiding this comment.
LG, but can you fix the merge conflict? The tests won't run until that's fixed.
Sorry, something went wrong.
|
Sure. How do you prefer to handle that? A rebase against current |
Sorry, something went wrong.
|
No rebase please! Please merge the updated main branch into your branch, fix the conflicts as part of the merge, and then commit and push (not push -f). The extra commits will be squashed when I merge back into main. |
Sorry, something went wrong.
gvanrossum
left a comment
There was a problem hiding this comment.
Thanks! LGTM. I'll merge.
Sorry, something went wrong.
|
It looks like one of the new tests isn't always passing: #116423 (comment) |
Sorry, something went wrong.
…hon#114432)" Reason: The new test doesn't always pass: python#116423 (comment) This reverts commit 1d0d49a.
Thanks, I've created a new PR on the same issue that will revert it. Sorry for the inconvenience. |
Sorry, something went wrong.
|
The PR has been reverted. @CendioOssman could you look into the test failures? I probably should have reviewed the test more carefully, sorry. |
Sorry, something went wrong.
|
Of course. And once they are fixed, how do you want to proceed? Submit a new PR using the existing branch? |
Sorry, something went wrong.
That sounds fine to me. If you can't get it to work, just rename the branch locally and push that. |
Sorry, something went wrong.
These give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work. This is a do-over with a test fix for gh-114432, which was reverted.
…on#116784) These give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work. This is a do-over with a test fix for pythongh-114432, which was reverted.
These give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work.
…ort}_clients (python#114432)" (python#116632) Revert "pythongh-113538: Add asycio.Server.{close,abort}_clients (python#114432)" Reason: The new test doesn't always pass: python#116423 (comment) This reverts commit 1d0d49a.
…on#116784) These give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work. This is a do-over with a test fix for pythongh-114432, which was reverted.
These give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work.
…ort}_clients (python#114432)" (python#116632) Revert "pythongh-113538: Add asycio.Server.{close,abort}_clients (python#114432)" Reason: The new test doesn't always pass: python#116423 (comment) This reverts commit 1d0d49a.
…on#116784) These give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work. This is a do-over with a test fix for pythongh-114432, which was reverted.
Give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work.
📚 Documentation preview 📚: https://cpython-previews--114432.org.readthedocs.build/