]> granicus.if.org Git - python/commitdiff
bpo-32356: idempotent pause_/resume_reading; new is_reading method. (#4914)
authorYury Selivanov <yury@magic.io>
Mon, 18 Dec 2017 22:03:23 +0000 (17:03 -0500)
committerGitHub <noreply@github.com>
Mon, 18 Dec 2017 22:03:23 +0000 (17:03 -0500)
Doc/library/asyncio-protocol.rst
Lib/asyncio/proactor_events.py
Lib/asyncio/selector_events.py
Lib/asyncio/sslproto.py
Lib/asyncio/transports.py
Lib/test/test_asyncio/test_proactor_events.py
Lib/test/test_asyncio/test_selector_events.py
Lib/test/test_asyncio/utils.py
Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst [new file with mode: 0644]

index a4b0d594933ced7f4d5de3133e1195a2a1c93077..3aa1f2f2e99647a61a5aeba39c98d6fecc616f82 100644 (file)
@@ -118,17 +118,31 @@ ReadTransport
 
    Interface for read-only transports.
 
+   .. method:: is_reading()
+
+      Return ``True`` if the transport is receiving new data.
+
+      .. versionadded:: 3.7
+
    .. method:: pause_reading()
 
       Pause the receiving end of the transport.  No data will be passed to
       the protocol's :meth:`data_received` method until :meth:`resume_reading`
       is called.
 
+      .. versionchanged:: 3.7
+         The method is idempotent, i.e. it can be called when the
+         transport is already paused or closed.
+
    .. method:: resume_reading()
 
       Resume the receiving end.  The protocol's :meth:`data_received` method
       will be called once again if some data is available for reading.
 
+      .. versionchanged:: 3.7
+         The method is idempotent, i.e. it can be called when the
+         transport is already reading.
+
 
 WriteTransport
 --------------
index 291d989cc45067836ef0bf493307586bced16269..915ad1ae91c317b46ea343145d649ca27c70228c 100644 (file)
@@ -152,21 +152,20 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
         self._paused = False
         self._loop.call_soon(self._loop_reading)
 
+    def is_reading(self):
+        return not self._paused and not self._closing
+
     def pause_reading(self):
-        if self._closing:
-            raise RuntimeError('Cannot pause_reading() when closing')
-        if self._paused:
-            raise RuntimeError('Already paused')
+        if self._closing or self._paused:
+            return
         self._paused = True
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
     def resume_reading(self):
-        if not self._paused:
-            raise RuntimeError('Not paused')
-        self._paused = False
-        if self._closing:
+        if self._closing or not self._paused:
             return
+        self._paused = False
         self._loop.call_soon(self._loop_reading, self._read_fut)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
index cb33cd34b87fd5dbf98cecf111f1aeb64a944650..3f44a998b895a250411c1b41d4c475c7388e2717 100644 (file)
@@ -702,22 +702,21 @@ class _SelectorSocketTransport(_SelectorTransport):
             self._loop.call_soon(futures._set_result_unless_cancelled,
                                  waiter, None)
 
+    def is_reading(self):
+        return not self._paused and not self._closing
+
     def pause_reading(self):
-        if self._closing:
-            raise RuntimeError('Cannot pause_reading() when closing')
-        if self._paused:
-            raise RuntimeError('Already paused')
+        if self._closing or self._paused:
+            return
         self._paused = True
         self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
     def resume_reading(self):
-        if not self._paused:
-            raise RuntimeError('Not paused')
-        self._paused = False
-        if self._closing:
+        if self._closing or not self._paused:
             return
+        self._paused = False
         self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
index 0c8f01ad8f1a8e95dfcf07633ff03df82fcfa8e7..8da8570d66d4fb67913f2cff18fff839507beb26 100644 (file)
@@ -317,6 +317,12 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
                           source=self)
             self.close()
 
+    def is_reading(self):
+        tr = self._ssl_protocol._transport
+        if tr is None:
+            raise RuntimeError('SSL transport has not been initialized yet')
+        return tr.is_reading()
+
     def pause_reading(self):
         """Pause the receiving end.
 
index 51f56737c67afbab97ee9d28683c1a6df14bf2d7..233bbb53cb6a3f92833232fdbcd4c0a1002722b6 100644 (file)
@@ -44,6 +44,10 @@ class BaseTransport:
 class ReadTransport(BaseTransport):
     """Interface for read-only transports."""
 
+    def is_reading(self):
+        """Return True if the transport is receiving."""
+        raise NotImplementedError
+
     def pause_reading(self):
         """Pause the receiving end.
 
index 910f2596620f77dfb076acf5257e5ac1f78f1a30..47ebcad238c440a366df99c2fc25da09efd47cb5 100644 (file)
@@ -334,26 +334,36 @@ class ProactorSocketTransportTests(test_utils.TestCase):
             f = asyncio.Future(loop=self.loop)
             f.set_result(msg)
             futures.append(f)
+
         self.loop._proactor.recv.side_effect = futures
         self.loop._run_once()
         self.assertFalse(tr._paused)
+        self.assertTrue(tr.is_reading())
         self.loop._run_once()
         self.protocol.data_received.assert_called_with(b'data1')
         self.loop._run_once()
         self.protocol.data_received.assert_called_with(b'data2')
+
+        tr.pause_reading()
         tr.pause_reading()
         self.assertTrue(tr._paused)
+        self.assertFalse(tr.is_reading())
         for i in range(10):
             self.loop._run_once()
         self.protocol.data_received.assert_called_with(b'data2')
+
+        tr.resume_reading()
         tr.resume_reading()
         self.assertFalse(tr._paused)
+        self.assertTrue(tr.is_reading())
         self.loop._run_once()
         self.protocol.data_received.assert_called_with(b'data3')
         self.loop._run_once()
         self.protocol.data_received.assert_called_with(b'data4')
         tr.close()
 
+        self.assertFalse(tr.is_reading())
+
 
     def pause_writing_transport(self, high):
         tr = self.socket_transport()
index b1ca3fcf0b18c372a2426242c80b4df283ae69f3..89c3d5e16c1db372b83e86875844958cbd3b4812 100644 (file)
@@ -80,10 +80,23 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         with test_utils.disable_logger():
             transport = self.loop._make_ssl_transport(
                 m, asyncio.Protocol(), m, waiter)
+
+            with self.assertRaisesRegex(RuntimeError,
+                                        r'SSL transport.*not.*initialized'):
+                transport.is_reading()
+
             # execute the handshake while the logger is disabled
             # to ignore SSL handshake failure
             test_utils.run_briefly(self.loop)
 
+        self.assertTrue(transport.is_reading())
+        transport.pause_reading()
+        transport.pause_reading()
+        self.assertFalse(transport.is_reading())
+        transport.resume_reading()
+        transport.resume_reading()
+        self.assertTrue(transport.is_reading())
+
         # Sanity check
         class_name = transport.__class__.__name__
         self.assertIn("ssl", class_name.lower())
@@ -894,15 +907,24 @@ class SelectorSocketTransportTests(test_utils.TestCase):
         tr = self.socket_transport()
         test_utils.run_briefly(self.loop)
         self.assertFalse(tr._paused)
+        self.assertTrue(tr.is_reading())
         self.loop.assert_reader(7, tr._read_ready)
+
+        tr.pause_reading()
         tr.pause_reading()
         self.assertTrue(tr._paused)
-        self.assertFalse(7 in self.loop.readers)
+        self.assertFalse(tr.is_reading())
+        self.loop.assert_no_reader(7)
+
+        tr.resume_reading()
         tr.resume_reading()
         self.assertFalse(tr._paused)
+        self.assertTrue(tr.is_reading())
         self.loop.assert_reader(7, tr._read_ready)
-        with self.assertRaises(RuntimeError):
-            tr.resume_reading()
+
+        tr.close()
+        self.assertFalse(tr.is_reading())
+        self.loop.assert_no_reader(7)
 
     def test_read_ready(self):
         transport = self.socket_transport()
index a1a9bb3684c3a98eb40960a31e1bfadf5453bb6c..eaafe3af8b885a444799ed7b227ed5e1ed545055 100644 (file)
@@ -327,12 +327,19 @@ class TestLoop(base_events.BaseEventLoop):
             return False
 
     def assert_reader(self, fd, callback, *args):
-        assert fd in self.readers, 'fd {} is not registered'.format(fd)
+        if fd not in self.readers:
+            raise AssertionError(f'fd {fd} is not registered')
         handle = self.readers[fd]
-        assert handle._callback == callback, '{!r} != {!r}'.format(
-            handle._callback, callback)
-        assert handle._args == args, '{!r} != {!r}'.format(
-            handle._args, args)
+        if handle._callback != callback:
+            raise AssertionError(
+                f'unexpected callback: {handle._callback} != {callback}')
+        if handle._args != args:
+            raise AssertionError(
+                f'unexpected callback args: {handle._args} != {args}')
+
+    def assert_no_reader(self, fd):
+        if fd in self.readers:
+            raise AssertionError(f'fd {fd} is registered')
 
     def _add_writer(self, fd, callback, *args):
         self.writers[fd] = events.Handle(callback, args, self)
diff --git a/Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst b/Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst
new file mode 100644 (file)
index 0000000..84b5381
--- /dev/null
@@ -0,0 +1,2 @@
+asyncio.transport.resume_reading() and pause_reading() are now idempotent.
+New transport.is_reading() method is added.