srv.listen(0)
srv.close()
- @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
++ @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ def test_flowinfo(self):
+ self.assertRaises(OverflowError, socket.getnameinfo,
+ ('::1',0, 0xffffffff), 0)
+ with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
+ self.assertRaises(OverflowError, s.bind, ('::1', 0, -10))
+
+@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
+class BasicCANTest(unittest.TestCase):
+
+ def testCrucialConstants(self):
+ socket.AF_CAN
+ socket.PF_CAN
+ socket.CAN_RAW
+
+ def testCreateSocket(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ pass
+
+ def testBindAny(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ s.bind(('', ))
+
+ def testTooLongInterfaceName(self):
+ # most systems limit IFNAMSIZ to 16, take 1024 to be sure
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ self.assertRaisesRegex(socket.error, 'interface name too long',
+ s.bind, ('x' * 1024,))
+
+ @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
+ 'socket.CAN_RAW_LOOPBACK required for this test.')
+ def testLoopback(self):
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ for loopback in (0, 1):
+ s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
+ loopback)
+ self.assertEqual(loopback,
+ s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
+
+ @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
+ 'socket.CAN_RAW_FILTER required for this test.')
+ def testFilter(self):
+ can_id, can_mask = 0x200, 0x700
+ can_filter = struct.pack("=II", can_id, can_mask)
+ with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+ s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
+ self.assertEqual(can_filter,
+ s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
+
+
+@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class CANTest(ThreadedCANSocketTest):
+
+ """The CAN frame structure is defined in <linux/can.h>:
+
+ struct can_frame {
+ canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
+ __u8 can_dlc; /* data length code: 0 .. 8 */
+ __u8 data[8] __attribute__((aligned(8)));
+ };
+ """
+ can_frame_fmt = "=IB3x8s"
+
+ def __init__(self, methodName='runTest'):
+ ThreadedCANSocketTest.__init__(self, methodName=methodName)
+
+ @classmethod
+ def build_can_frame(cls, can_id, data):
+ """Build a CAN frame."""
+ can_dlc = len(data)
+ data = data.ljust(8, b'\x00')
+ return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
+
+ @classmethod
+ def dissect_can_frame(cls, frame):
+ """Dissect a CAN frame."""
+ can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
+ return (can_id, can_dlc, data[:can_dlc])
+
+ def testSendFrame(self):
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf, cf)
+ self.assertEqual(addr[0], self.interface)
+ self.assertEqual(addr[1], socket.AF_CAN)
+
+ def _testSendFrame(self):
+ self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
+ self.cli.send(self.cf)
+
+ def testSendMaxFrame(self):
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf, cf)
+
+ def _testSendMaxFrame(self):
+ self.cf = self.build_can_frame(0x00, b'\x07' * 8)
+ self.cli.send(self.cf)
+
+ def testSendMultiFrames(self):
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf1, cf)
+
+ cf, addr = self.s.recvfrom(self.bufsize)
+ self.assertEqual(self.cf2, cf)
+
+ def _testSendMultiFrames(self):
+ self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
+ self.cli.send(self.cf1)
+
+ self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
+ self.cli.send(self.cf2)
+
+
+@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
+class BasicRDSTest(unittest.TestCase):
+
+ def testCrucialConstants(self):
+ socket.AF_RDS
+ socket.PF_RDS
+
+ def testCreateSocket(self):
+ with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
+ pass
+
+ def testSocketBufferSize(self):
+ bufsize = 16384
+ with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
+
+
+@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RDSTest(ThreadedRDSSocketTest):
+
+ def __init__(self, methodName='runTest'):
+ ThreadedRDSSocketTest.__init__(self, methodName=methodName)
+
+ def setUp(self):
+ super().setUp()
+ self.evt = threading.Event()
+
+ def testSendAndRecv(self):
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data, data)
+ self.assertEqual(self.cli_addr, addr)
+
+ def _testSendAndRecv(self):
+ self.data = b'spam'
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+
+ def testPeek(self):
+ data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
+ self.assertEqual(self.data, data)
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data, data)
+
+ def _testPeek(self):
+ self.data = b'spam'
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+
+ @requireAttrs(socket.socket, 'recvmsg')
+ def testSendAndRecvMsg(self):
+ data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
+ self.assertEqual(self.data, data)
+
+ @requireAttrs(socket.socket, 'sendmsg')
+ def _testSendAndRecvMsg(self):
+ self.data = b'hello ' * 10
+ self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
+
+ def testSendAndRecvMulti(self):
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data1, data)
+
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data2, data)
+
+ def _testSendAndRecvMulti(self):
+ self.data1 = b'bacon'
+ self.cli.sendto(self.data1, 0, (HOST, self.port))
+
+ self.data2 = b'egg'
+ self.cli.sendto(self.data2, 0, (HOST, self.port))
+
+ def testSelect(self):
+ r, w, x = select.select([self.serv], [], [], 3.0)
+ self.assertIn(self.serv, r)
+ data, addr = self.serv.recvfrom(self.bufsize)
+ self.assertEqual(self.data, data)
+
+ def _testSelect(self):
+ self.data = b'select'
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+
+ def testCongestion(self):
+ # wait until the sender is done
+ self.evt.wait()
+
+ def _testCongestion(self):
+ # test the behavior in case of congestion
+ self.data = b'fill'
+ self.cli.setblocking(False)
+ try:
+ # try to lower the receiver's socket buffer size
+ self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
+ except OSError:
+ pass
+ with self.assertRaises(OSError) as cm:
+ try:
+ # fill the receiver's socket buffer
+ while True:
+ self.cli.sendto(self.data, 0, (HOST, self.port))
+ finally:
+ # signal the receiver we're done
+ self.evt.set()
+ # sendto() should have failed with ENOBUFS
+ self.assertEqual(cm.exception.errno, errno.ENOBUFS)
+ # and we should have received a congestion notification through poll
+ r, w, x = select.select([self.serv], [], [], 3.0)
+ self.assertIn(self.serv, r)
+
+
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):