]> granicus.if.org Git - python/commitdiff
asyncio, tulip issue 202: Add unit test of pause/resume writing for proactor
authorVictor Stinner <victor.stinner@gmail.com>
Thu, 11 Dec 2014 21:23:19 +0000 (22:23 +0100)
committerVictor Stinner <victor.stinner@gmail.com>
Thu, 11 Dec 2014 21:23:19 +0000 (22:23 +0100)
socket transport

Lib/asyncio/proactor_events.py
Lib/test/test_asyncio/test_proactor_events.py

index 4c527aa262a5d2b2e372662467839af30c5193e9..e67cf65a10f01c33c9a9b7cb360f472c78e21b3e 100644 (file)
@@ -230,10 +230,6 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
             assert self._buffer is None
             # Pass a copy, except if it's already immutable.
             self._loop_writing(data=bytes(data))
-            # XXX Should we pause the protocol at this point
-            # if len(data) > self._high_water?  (That would
-            # require keeping track of the number of bytes passed
-            # to a send() that hasn't finished yet.)
         elif not self._buffer:  # WRITING -> BACKED UP
             # Make a mutable copy which we can extend.
             self._buffer = bytearray(data)
index 0c536986ff7f157ee7a4b35d44120d2623441ce8..9e9b41a47fd7858d86c48233da4f2e81ab05ea3e 100644 (file)
@@ -343,6 +343,88 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         tr.close()
 
 
+    def pause_writing_transport(self, high):
+        tr = _ProactorSocketTransport(
+            self.loop, self.sock, self.protocol)
+        self.addCleanup(tr.close)
+
+        tr.set_write_buffer_limits(high=high)
+
+        self.assertEqual(tr.get_write_buffer_size(), 0)
+        self.assertFalse(self.protocol.pause_writing.called)
+        self.assertFalse(self.protocol.resume_writing.called)
+        return tr
+
+    def test_pause_resume_writing(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # write a large chunk, must pause writing
+        fut = asyncio.Future(loop=self.loop)
+        self.loop._proactor.send.return_value = fut
+        tr.write(b'large data')
+        self.loop._run_once()
+        self.assertTrue(self.protocol.pause_writing.called)
+
+        # flush the buffer
+        fut.set_result(None)
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 0)
+        self.assertTrue(self.protocol.resume_writing.called)
+
+    def test_pause_writing_2write(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # first short write, the buffer is not full (3 <= 4)
+        fut1 = asyncio.Future(loop=self.loop)
+        self.loop._proactor.send.return_value = fut1
+        tr.write(b'123')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 3)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+        # fill the buffer, must pause writing (6 > 4)
+        tr.write(b'abc')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 6)
+        self.assertTrue(self.protocol.pause_writing.called)
+
+    def test_pause_writing_3write(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # first short write, the buffer is not full (1 <= 4)
+        fut = asyncio.Future(loop=self.loop)
+        self.loop._proactor.send.return_value = fut
+        tr.write(b'1')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 1)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+        # second short write, the buffer is not full (3 <= 4)
+        tr.write(b'23')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 3)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+        # fill the buffer, must pause writing (6 > 4)
+        tr.write(b'abc')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 6)
+        self.assertTrue(self.protocol.pause_writing.called)
+
+    def test_dont_pause_writing(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # write a large chunk which completes immedialty,
+        # it should not pause writing
+        fut = asyncio.Future(loop=self.loop)
+        fut.set_result(None)
+        self.loop._proactor.send.return_value = fut
+        tr.write(b'very large data')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 0)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+
 class BaseProactorEventLoopTests(test_utils.TestCase):
 
     def setUp(self):