◐ Shell
clean mode source ↗

[3.9] bpo-30064: Fix asyncio loop.sock_* race condition issue (GH-20369) by miss-islington · Pull Request #20460 · python/cpython

Expand Up @@ -266,6 +266,7 @@ def _add_reader(self, fd, callback, *args): (handle, writer)) if reader is not None: reader.cancel() return handle
def _remove_reader(self, fd): if self.is_closed(): Expand Down Expand Up @@ -302,6 +303,7 @@ def _add_writer(self, fd, callback, *args): (reader, handle)) if writer is not None: writer.cancel() return handle
def _remove_writer(self, fd): """Remove a writer callback.""" Expand Down Expand Up @@ -329,7 +331,7 @@ def _remove_writer(self, fd): def add_reader(self, fd, callback, *args): """Add a reader callback.""" self._ensure_fd_no_transport(fd) return self._add_reader(fd, callback, *args) self._add_reader(fd, callback, *args)
def remove_reader(self, fd): """Remove a reader callback.""" Expand All @@ -339,7 +341,7 @@ def remove_reader(self, fd): def add_writer(self, fd, callback, *args): """Add a writer callback..""" self._ensure_fd_no_transport(fd) return self._add_writer(fd, callback, *args) self._add_writer(fd, callback, *args)
def remove_writer(self, fd): """Remove a writer callback.""" Expand All @@ -362,13 +364,15 @@ async def sock_recv(self, sock, n): pass fut = self.create_future() fd = sock.fileno() self.add_reader(fd, self._sock_recv, fut, sock, n) self._ensure_fd_no_transport(fd) handle = self._add_reader(fd, self._sock_recv, fut, sock, n) fut.add_done_callback( functools.partial(self._sock_read_done, fd)) functools.partial(self._sock_read_done, fd, handle=handle)) return await fut
def _sock_read_done(self, fd, fut): self.remove_reader(fd) def _sock_read_done(self, fd, fut, handle=None): if handle is None or not handle.cancelled(): self.remove_reader(fd)
def _sock_recv(self, fut, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't Expand Down Expand Up @@ -401,9 +405,10 @@ async def sock_recv_into(self, sock, buf): pass fut = self.create_future() fd = sock.fileno() self.add_reader(fd, self._sock_recv_into, fut, sock, buf) self._ensure_fd_no_transport(fd) handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) fut.add_done_callback( functools.partial(self._sock_read_done, fd)) functools.partial(self._sock_read_done, fd, handle=handle)) return await fut
def _sock_recv_into(self, fut, sock, buf): Expand Down Expand Up @@ -446,11 +451,12 @@ async def sock_sendall(self, sock, data):
fut = self.create_future() fd = sock.fileno() fut.add_done_callback( functools.partial(self._sock_write_done, fd)) self._ensure_fd_no_transport(fd) # use a trick with a list in closure to store a mutable state self.add_writer(fd, self._sock_sendall, fut, sock, memoryview(data), [n]) handle = self._add_writer(fd, self._sock_sendall, fut, sock, memoryview(data), [n]) fut.add_done_callback( functools.partial(self._sock_write_done, fd, handle=handle)) return await fut
def _sock_sendall(self, fut, sock, view, pos): Expand Down Expand Up @@ -502,18 +508,21 @@ def _sock_connect(self, fut, sock, address): # connection runs in background. We have to wait until the socket # becomes writable to be notified when the connection succeed or # fails. self._ensure_fd_no_transport(fd) handle = self._add_writer( fd, self._sock_connect_cb, fut, sock, address) fut.add_done_callback( functools.partial(self._sock_write_done, fd)) self.add_writer(fd, self._sock_connect_cb, fut, sock, address) functools.partial(self._sock_write_done, fd, handle=handle)) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: fut.set_exception(exc) else: fut.set_result(None)
def _sock_write_done(self, fd, fut): self.remove_writer(fd) def _sock_write_done(self, fd, fut, handle=None): if handle is None or not handle.cancelled(): self.remove_writer(fd)
def _sock_connect_cb(self, fut, sock, address): if fut.done(): Expand Down