]> granicus.if.org Git - python/commitdiff
asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin
authorYury Selivanov <yselivanov@sprymix.com>
Tue, 18 Feb 2014 23:41:13 +0000 (18:41 -0500)
committerYury Selivanov <yselivanov@sprymix.com>
Tue, 18 Feb 2014 23:41:13 +0000 (18:41 -0500)
Lib/asyncio/proactor_events.py
Lib/asyncio/selector_events.py
Lib/asyncio/transports.py
Lib/asyncio/unix_events.py

index b2ac632fa4c8ec99f4b0132d4b4ace8bcfb35184..d72f9274a8342f51f9a7ac298539376c97a52b9f 100644 (file)
@@ -15,7 +15,8 @@ from . import transports
 from .log import logger
 
 
-class _ProactorBasePipeTransport(transports.BaseTransport):
+class _ProactorBasePipeTransport(transports._FlowControlMixin,
+                                 transports.BaseTransport):
     """Base class for pipe and socket transports."""
 
     def __init__(self, loop, sock, protocol, waiter=None,
@@ -33,8 +34,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
         self._conn_lost = 0
         self._closing = False  # Set when close() called.
         self._eof_written = False
-        self._protocol_paused = False
-        self.set_write_buffer_limits()
         if self._server is not None:
             self._server.attach(self)
         self._loop.call_soon(self._protocol.connection_made, self)
@@ -94,56 +93,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
                 server.detach(self)
                 self._server = None
 
-    # XXX The next four methods are nearly identical to corresponding
-    # ones in _SelectorTransport.  Maybe refactor buffer management to
-    # share the implementations?  (Also these are really only needed
-    # by _ProactorWritePipeTransport but since _buffer is defined on
-    # the base class I am putting it here for now.)
-
-    def _maybe_pause_protocol(self):
-        size = self.get_write_buffer_size()
-        if size <= self._high_water:
-            return
-        if not self._protocol_paused:
-            self._protocol_paused = True
-            try:
-                self._protocol.pause_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.pause_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def _maybe_resume_protocol(self):
-        if (self._protocol_paused and
-            self.get_write_buffer_size() <= self._low_water):
-            self._protocol_paused = False
-            try:
-                self._protocol.resume_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.resume_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def set_write_buffer_limits(self, high=None, low=None):
-        if high is None:
-            if low is None:
-                high = 64*1024
-            else:
-                high = 4*low
-        if low is None:
-            low = high // 4
-        if not high >= low >= 0:
-            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
-                             (high, low))
-        self._high_water = high
-        self._low_water = low
-
     def get_write_buffer_size(self):
         size = self._pending_write
         if self._buffer is not None:
index fb86f8245b1ae2af7647ad456b787d959bb4ec54..869d66e0c8f5c0e6967f43b6a91887c0a88f79b5 100644 (file)
@@ -338,77 +338,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         sock.close()
 
 
-class _FlowControlMixin(transports.Transport):
-    """All the logic for (write) flow control in a mix-in base class.
-
-    The subclass must implement get_write_buffer_size().  It must call
-    _maybe_pause_protocol() whenever the write buffer size increases,
-    and _maybe_resume_protocol() whenever it decreases.  It may also
-    override set_write_buffer_limits() (e.g. to specify different
-    defaults).
-
-    The subclass constructor must call super().__init__(extra).  This
-    will call set_write_buffer_limits().
-
-    The user may call set_write_buffer_limits() and
-    get_write_buffer_size(), and their protocol's pause_writing() and
-    resume_writing() may be called.
-    """
-
-    def __init__(self, extra=None):
-        super().__init__(extra)
-        self._protocol_paused = False
-        self.set_write_buffer_limits()
-
-    def _maybe_pause_protocol(self):
-        size = self.get_write_buffer_size()
-        if size <= self._high_water:
-            return
-        if not self._protocol_paused:
-            self._protocol_paused = True
-            try:
-                self._protocol.pause_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.pause_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def _maybe_resume_protocol(self):
-        if (self._protocol_paused and
-            self.get_write_buffer_size() <= self._low_water):
-            self._protocol_paused = False
-            try:
-                self._protocol.resume_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.resume_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def set_write_buffer_limits(self, high=None, low=None):
-        if high is None:
-            if low is None:
-                high = 64*1024
-            else:
-                high = 4*low
-        if low is None:
-            low = high // 4
-        if not high >= low >= 0:
-            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
-                             (high, low))
-        self._high_water = high
-        self._low_water = low
-
-    def get_write_buffer_size(self):
-        raise NotImplementedError
-
-
-class _SelectorTransport(_FlowControlMixin, transports.Transport):
+class _SelectorTransport(transports._FlowControlMixin,
+                         transports.Transport):
 
     max_size = 256 * 1024  # Buffer size passed to recv().
 
index 67ae7fda9033b64da39a56b4a020a4925bdcdcf6..5b975aa7315c269ade88a5495aede2e1d67c46b7 100644 (file)
@@ -219,3 +219,73 @@ class SubprocessTransport(BaseTransport):
         http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
         """
         raise NotImplementedError
+
+
+class _FlowControlMixin(Transport):
+    """All the logic for (write) flow control in a mix-in base class.
+
+    The subclass must implement get_write_buffer_size().  It must call
+    _maybe_pause_protocol() whenever the write buffer size increases,
+    and _maybe_resume_protocol() whenever it decreases.  It may also
+    override set_write_buffer_limits() (e.g. to specify different
+    defaults).
+
+    The subclass constructor must call super().__init__(extra).  This
+    will call set_write_buffer_limits().
+
+    The user may call set_write_buffer_limits() and
+    get_write_buffer_size(), and their protocol's pause_writing() and
+    resume_writing() may be called.
+    """
+
+    def __init__(self, extra=None):
+        super().__init__(extra)
+        self._protocol_paused = False
+        self.set_write_buffer_limits()
+
+    def _maybe_pause_protocol(self):
+        size = self.get_write_buffer_size()
+        if size <= self._high_water:
+            return
+        if not self._protocol_paused:
+            self._protocol_paused = True
+            try:
+                self._protocol.pause_writing()
+            except Exception as exc:
+                self._loop.call_exception_handler({
+                    'message': 'protocol.pause_writing() failed',
+                    'exception': exc,
+                    'transport': self,
+                    'protocol': self._protocol,
+                })
+
+    def _maybe_resume_protocol(self):
+        if (self._protocol_paused and
+            self.get_write_buffer_size() <= self._low_water):
+            self._protocol_paused = False
+            try:
+                self._protocol.resume_writing()
+            except Exception as exc:
+                self._loop.call_exception_handler({
+                    'message': 'protocol.resume_writing() failed',
+                    'exception': exc,
+                    'transport': self,
+                    'protocol': self._protocol,
+                })
+
+    def set_write_buffer_limits(self, high=None, low=None):
+        if high is None:
+            if low is None:
+                high = 64*1024
+            else:
+                high = 4*low
+        if low is None:
+            low = high // 4
+        if not high >= low >= 0:
+            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+                             (high, low))
+        self._high_water = high
+        self._low_water = low
+
+    def get_write_buffer_size(self):
+        raise NotImplementedError
index 9a40c04d1ead66450aea7c51bc82d4f603f24a75..748452c588618267214713821ab90fdc2f9793cb 100644 (file)
@@ -317,7 +317,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
             self._loop = None
 
 
-class _UnixWritePipeTransport(selector_events._FlowControlMixin,
+class _UnixWritePipeTransport(transports._FlowControlMixin,
                               transports.WriteTransport):
 
     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):