From 31e7bfa6ba5354ba44677736a23facb8463077a9 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 22 Jul 2014 12:03:40 +0200 Subject: [PATCH] asyncio, tulip issue 193: Convert StreamWriter.drain() to a classic coroutine Replace also _make_drain_waiter() function with a classic _drain_helper() coroutine. --- Lib/asyncio/streams.py | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index d18db77b4f..c77eb606c2 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -141,15 +141,14 @@ class FlowControlMixin(protocols.Protocol): resume_reading() and connection_lost(). If the subclass overrides these it must call the super methods. - StreamWriter.drain() must check for error conditions and then call - _make_drain_waiter(), which will return either () or a Future - depending on the paused state. + StreamWriter.drain() must wait for _drain_helper() coroutine. """ def __init__(self, loop=None): self._loop = loop # May be None; we may never need it. self._paused = False self._drain_waiter = None + self._connection_lost = False def pause_writing(self): assert not self._paused @@ -170,6 +169,7 @@ class FlowControlMixin(protocols.Protocol): waiter.set_result(None) def connection_lost(self, exc): + self._connection_lost = True # Wake up the writer if currently paused. if not self._paused: return @@ -184,14 +184,17 @@ class FlowControlMixin(protocols.Protocol): else: waiter.set_exception(exc) - def _make_drain_waiter(self): + @coroutine + def _drain_helper(self): + if self._connection_lost: + raise ConnectionResetError('Connection lost') if not self._paused: - return () + return waiter = self._drain_waiter assert waiter is None or waiter.cancelled() waiter = futures.Future(loop=self._loop) self._drain_waiter = waiter - return waiter + yield from waiter class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): @@ -247,6 +250,8 @@ class StreamWriter: def __init__(self, transport, protocol, reader, loop): self._transport = transport self._protocol = protocol + # drain() expects that the reader has a exception() method + assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop @@ -278,26 +283,20 @@ class StreamWriter: def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) + @coroutine def drain(self): - """This method has an unusual return value. + """Flush the write buffer. The intended use is to write w.write(data) yield from w.drain() - - When there's nothing to wait for, drain() returns (), and the - yield-from continues immediately. When the transport buffer - is full (the protocol is paused), drain() creates and returns - a Future and the yield-from will block until that Future is - completed, which will happen when the buffer is (partially) - drained and the protocol is resumed. """ - if self._reader is not None and self._reader._exception is not None: - raise self._reader._exception - if self._transport._conn_lost: # Uses private variable. - raise ConnectionResetError('Connection lost') - return self._protocol._make_drain_waiter() + if self._reader is not None: + exc = self._reader.exception() + if exc is not None: + raise exc + yield from self._protocol._drain_helper() class StreamReader: -- 2.40.0