self.assertFalse(self.protocol.pause_writing.called)
-@unittest.skip('FIXME: bpo-33694: these tests are too close '
- 'to the implementation and should be refactored or removed')
-class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
-
- def setUp(self):
- super().setUp()
- self.loop = self.new_test_loop()
- self.addCleanup(self.loop.close)
- self.proactor = mock.Mock()
- self.loop._proactor = self.proactor
-
- self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
- self.buf = bytearray(1)
- self.protocol.get_buffer.side_effect = lambda hint: self.buf
-
- self.sock = mock.Mock(socket.socket)
-
- def socket_transport(self, waiter=None):
- transport = _ProactorSocketTransport(self.loop, self.sock,
- self.protocol, waiter=waiter)
- self.addCleanup(close_transport, transport)
- return transport
-
- def test_ctor(self):
- fut = self.loop.create_future()
- tr = self.socket_transport(waiter=fut)
- test_utils.run_briefly(self.loop)
- self.assertIsNone(fut.result())
- self.protocol.connection_made(tr)
- self.proactor.recv_into.assert_called_with(self.sock, self.buf)
-
- def test_loop_reading(self):
- tr = self.socket_transport()
- tr._loop_reading()
- self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
- self.assertTrue(self.protocol.get_buffer.called)
- self.assertFalse(self.protocol.buffer_updated.called)
- self.assertFalse(self.protocol.eof_received.called)
-
- def test_get_buffer_error(self):
- transport = self.socket_transport()
- transport._fatal_error = mock.Mock()
-
- self.loop.call_exception_handler = mock.Mock()
- self.protocol.get_buffer.side_effect = LookupError()
-
- transport._loop_reading()
-
- self.assertTrue(transport._fatal_error.called)
- self.assertTrue(self.protocol.get_buffer.called)
- self.assertFalse(self.protocol.buffer_updated.called)
-
- def test_get_buffer_zerosized(self):
- transport = self.socket_transport()
- transport._fatal_error = mock.Mock()
-
- self.loop.call_exception_handler = mock.Mock()
- self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
-
- transport._loop_reading()
-
- self.assertTrue(transport._fatal_error.called)
- self.assertTrue(self.protocol.get_buffer.called)
- self.assertFalse(self.protocol.buffer_updated.called)
-
- def test_proto_type_switch(self):
- self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
- tr = self.socket_transport()
-
- res = self.loop.create_future()
- res.set_result(b'data')
-
- tr = self.socket_transport()
- tr._read_fut = res
- tr._loop_reading(res)
- self.loop._proactor.recv.assert_called_with(self.sock, 32768)
- self.protocol.data_received.assert_called_with(b'data')
-
- # switch protocol to a BufferedProtocol
-
- buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
- buf = bytearray(4)
- buf_proto.get_buffer.side_effect = lambda hint: buf
-
- tr.set_protocol(buf_proto)
- test_utils.run_briefly(self.loop)
- res = self.loop.create_future()
- res.set_result(4)
-
- tr._read_fut = res
- tr._loop_reading(res)
- self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
- buf_proto.buffer_updated.assert_called_with(4)
-
- @unittest.skip('FIXME: bpo-33694: this test is too close to the '
- 'implementation and should be refactored or removed')
- def test_proto_buf_switch(self):
- tr = self.socket_transport()
- test_utils.run_briefly(self.loop)
- self.protocol.get_buffer.assert_called_with(-1)
-
- # switch protocol to *another* BufferedProtocol
-
- buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
- buf = bytearray(4)
- buf_proto.get_buffer.side_effect = lambda hint: buf
- tr._read_fut.done.side_effect = lambda: False
- tr.set_protocol(buf_proto)
- self.assertFalse(buf_proto.get_buffer.called)
- test_utils.run_briefly(self.loop)
- buf_proto.get_buffer.assert_called_with(-1)
-
- def test_buffer_updated_error(self):
- transport = self.socket_transport()
- transport._fatal_error = mock.Mock()
-
- self.loop.call_exception_handler = mock.Mock()
- self.protocol.buffer_updated.side_effect = LookupError()
-
- res = self.loop.create_future()
- res.set_result(10)
- transport._read_fut = res
- transport._loop_reading(res)
-
- self.assertTrue(transport._fatal_error.called)
- self.assertFalse(self.protocol.get_buffer.called)
- self.assertTrue(self.protocol.buffer_updated.called)
-
- def test_loop_eof_received_error(self):
- res = self.loop.create_future()
- res.set_result(0)
-
- self.protocol.eof_received.side_effect = LookupError()
-
- tr = self.socket_transport()
- tr._fatal_error = mock.Mock()
-
- tr.close = mock.Mock()
- tr._read_fut = res
- tr._loop_reading(res)
- self.assertFalse(self.loop._proactor.recv_into.called)
- self.assertTrue(self.protocol.eof_received.called)
- self.assertTrue(tr._fatal_error.called)
-
- def test_loop_reading_data(self):
- res = self.loop.create_future()
- res.set_result(4)
-
- tr = self.socket_transport()
- tr._read_fut = res
- tr._loop_reading(res)
- self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
- self.protocol.buffer_updated.assert_called_with(4)
-
- def test_loop_reading_no_data(self):
- res = self.loop.create_future()
- res.set_result(0)
-
- tr = self.socket_transport()
- self.assertRaises(AssertionError, tr._loop_reading, res)
-
- tr.close = mock.Mock()
- tr._read_fut = res
- tr._loop_reading(res)
- self.assertFalse(self.loop._proactor.recv_into.called)
- self.assertTrue(self.protocol.eof_received.called)
- self.assertTrue(tr.close.called)
-
- def test_loop_reading_aborted(self):
- err = self.loop._proactor.recv_into.side_effect = \
- ConnectionAbortedError()
-
- tr = self.socket_transport()
- tr._fatal_error = mock.Mock()
- tr._loop_reading()
- tr._fatal_error.assert_called_with(
- err, 'Fatal read error on pipe transport')
-
- def test_loop_reading_aborted_closing(self):
- self.loop._proactor.recv.side_effect = ConnectionAbortedError()
-
- tr = self.socket_transport()
- tr._closing = True
- tr._fatal_error = mock.Mock()
- tr._loop_reading()
- self.assertFalse(tr._fatal_error.called)
-
- def test_loop_reading_aborted_is_fatal(self):
- self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
- tr = self.socket_transport()
- tr._closing = False
- tr._fatal_error = mock.Mock()
- tr._loop_reading()
- self.assertTrue(tr._fatal_error.called)
-
- def test_loop_reading_conn_reset_lost(self):
- err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
-
- tr = self.socket_transport()
- tr._closing = False
- tr._fatal_error = mock.Mock()
- tr._force_close = mock.Mock()
- tr._loop_reading()
- self.assertFalse(tr._fatal_error.called)
- tr._force_close.assert_called_with(err)
-
- def test_loop_reading_exception(self):
- err = self.loop._proactor.recv_into.side_effect = OSError()
-
- tr = self.socket_transport()
- tr._fatal_error = mock.Mock()
- tr._loop_reading()
- tr._fatal_error.assert_called_with(
- err, 'Fatal read error on pipe transport')
-
- def test_pause_resume_reading(self):
- tr = self.socket_transport()
- futures = []
- for msg in [10, 20, 30, 40, 0]:
- f = self.loop.create_future()
- f.set_result(msg)
- futures.append(f)
-
- self.loop._proactor.recv_into.side_effect = futures
- self.loop._run_once()
- self.assertFalse(tr._paused)
- self.assertTrue(tr.is_reading())
- self.loop._run_once()
- self.protocol.buffer_updated.assert_called_with(10)
- self.loop._run_once()
- self.protocol.buffer_updated.assert_called_with(20)
-
- tr.pause_reading()
- tr.pause_reading()
- self.assertTrue(tr._paused)
- self.assertFalse(tr.is_reading())
- for i in range(10):
- self.loop._run_once()
- self.protocol.buffer_updated.assert_called_with(20)
-
- tr.resume_reading()
- tr.resume_reading()
- self.assertFalse(tr._paused)
- self.assertTrue(tr.is_reading())
- self.loop._run_once()
- self.protocol.buffer_updated.assert_called_with(30)
- self.loop._run_once()
- self.protocol.buffer_updated.assert_called_with(40)
- tr.close()
-
- self.assertFalse(tr.is_reading())
-
-
class ProactorDatagramTransportTests(test_utils.TestCase):
def setUp(self):