def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
- def testRecvInto(self):
+ def testRecvIntoArray(self):
buf = bytearray(1024)
nbytes = self.cli_conn.recv_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
- def _testRecvInto(self):
+ def _testRecvIntoArray(self):
buf = bytes(MSG)
self.serv_conn.send(buf)
- def testRecvFromInto(self):
+ def testRecvIntoBytearray(self):
+ buf = bytearray(1024)
+ nbytes = self.cli_conn.recv_into(buf)
+ self.assertEqual(nbytes, len(MSG))
+ msg = buf[:len(MSG)]
+ self.assertEqual(msg, MSG)
+
+ _testRecvIntoBytearray = _testRecvIntoArray
+
+ def testRecvIntoMemoryview(self):
+ buf = bytearray(1024)
+ nbytes = self.cli_conn.recv_into(memoryview(buf))
+ self.assertEqual(nbytes, len(MSG))
+ msg = buf[:len(MSG)]
+ self.assertEqual(msg, MSG)
+
+ _testRecvIntoMemoryview = _testRecvIntoArray
+
+ def testRecvFromIntoArray(self):
buf = bytearray(1024)
nbytes, addr = self.cli_conn.recvfrom_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
- def _testRecvFromInto(self):
+ def _testRecvFromIntoArray(self):
buf = bytes(MSG)
self.serv_conn.send(buf)
+ def testRecvFromIntoBytearray(self):
+ buf = bytearray(1024)
+ nbytes, addr = self.cli_conn.recvfrom_into(buf)
+ self.assertEqual(nbytes, len(MSG))
+ msg = buf[:len(MSG)]
+ self.assertEqual(msg, MSG)
+
+ _testRecvFromIntoBytearray = _testRecvFromIntoArray
+
+ def testRecvFromIntoMemoryview(self):
+ buf = bytearray(1024)
+ nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
+ self.assertEqual(nbytes, len(MSG))
+ msg = buf[:len(MSG)]
+ self.assertEqual(msg, MSG)
+
+ _testRecvFromIntoMemoryview = _testRecvFromIntoArray
+
TIPC_STYPE = 2000
TIPC_LOWER = 200