◐ Shell
reader mode source ↗
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
41 changes: 25 additions & 16 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def _add_reader(self, fd, callback, *args):
(handle, writer))
if reader is not None:
reader.cancel()

def _remove_reader(self, fd):
if self.is_closed():
Expand Down Expand Up @@ -302,6 +303,7 @@ def _add_writer(self, fd, callback, *args):
(reader, handle))
if writer is not None:
writer.cancel()

def _remove_writer(self, fd):
"""Remove a writer callback."""
Expand Down Expand Up @@ -329,7 +331,7 @@ def _remove_writer(self, fd):
def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
self._ensure_fd_no_transport(fd)
return self._add_reader(fd, callback, *args)

def remove_reader(self, fd):
"""Remove a reader callback."""
Expand All @@ -339,7 +341,7 @@ def remove_reader(self, fd):
def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
self._ensure_fd_no_transport(fd)
return self._add_writer(fd, callback, *args)

def remove_writer(self, fd):
"""Remove a writer callback."""
Expand All @@ -362,13 +364,15 @@ async def sock_recv(self, sock, n):
pass
fut = self.create_future()
fd = sock.fileno()
self.add_reader(fd, self._sock_recv, fut, sock, n)
fut.add_done_callback(
functools.partial(self._sock_read_done, fd))
return await fut

def _sock_read_done(self, fd, fut):
self.remove_reader(fd)

def _sock_recv(self, fut, sock, n):
# _sock_recv() can add itself as an I/O callback if the operation can't
Expand Down Expand Up @@ -401,9 +405,10 @@ async def sock_recv_into(self, sock, buf):
pass
fut = self.create_future()
fd = sock.fileno()
self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
fut.add_done_callback(
functools.partial(self._sock_read_done, fd))
return await fut

def _sock_recv_into(self, fut, sock, buf):
Expand Down Expand Up @@ -446,11 +451,12 @@ async def sock_sendall(self, sock, data):

fut = self.create_future()
fd = sock.fileno()
fut.add_done_callback(
functools.partial(self._sock_write_done, fd))
# use a trick with a list in closure to store a mutable state
self.add_writer(fd, self._sock_sendall, fut, sock,
memoryview(data), [n])
return await fut

def _sock_sendall(self, fut, sock, view, pos):
Expand Down Expand Up @@ -502,18 +508,21 @@ def _sock_connect(self, fut, sock, address):
# connection runs in background. We have to wait until the socket
# becomes writable to be notified when the connection succeed or
# fails.
fut.add_done_callback(
functools.partial(self._sock_write_done, fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)

def _sock_write_done(self, fd, fut):
self.remove_writer(fd)

def _sock_connect_cb(self, fut, sock, address):
if fut.done():
Expand Down
131 changes: 131 additions & 0 deletions Lib/test/test_asyncio/test_sock_lowlevel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import socket
import asyncio
import sys
from asyncio import proactor_events
Expand Down Expand Up @@ -122,6 +123,136 @@ def test_sock_client_ops(self):
sock = socket.socket()
self._basetest_sock_recv_into(httpd, sock)

async def _basetest_huge_content(self, address):
sock = socket.socket()
sock.setblocking(False)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Toggle all file notes Toggle all file annotations