Expand Up
@@ -199,9 +199,6 @@ async def _drain_helper(self):
self._drain_waiter = waiter
await waiter
def _get_close_waiter(self, stream):
raise NotImplementedError
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader.
Expand Down
Expand Up
@@ -318,9 +315,6 @@ def eof_received(self):
return False
return True
def _get_close_waiter(self, stream):
return self._closed
def __del__(self):
# Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack
Expand Down
Expand Up
@@ -418,7 +412,7 @@ def is_closing(self):
return self._transport.is_closing()
async def wait_closed(self):
await self._protocol._get_close_waiter(self)
await self._protocol._closed
def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)
Expand All
@@ -436,12 +430,13 @@ async def drain(self):
if exc is not None:
raise exc
if self._transport.is_closing():
# Wait for protocol.connection_lost() call
# Raise connection closing error if any,
# ConnectionResetError otherwise
fut = self._protocol._get_close_waiter(self)
await fut
raise ConnectionResetError('Connection lost')
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
await sleep(0, loop=self._loop)
await self._protocol._drain_helper()
Expand Down