else:
return socket.send(self, data, flags)
- def sendto(self, data, addr, flags=0):
+ def sendto(self, data, flags_or_addr, addr=None):
self._checkClosed()
if self._sslobj:
raise ValueError("sendto not allowed on instances of %s" %
self.__class__)
+ elif addr is None:
+ return socket.sendto(self, data, flags_or_addr)
else:
- return socket.sendto(self, data, addr, flags)
+ return socket.sendto(self, data, flags_or_addr, addr)
def sendall(self, data, flags=0):
self._checkClosed()
else:
return socket.recv_into(self, buffer, nbytes, flags)
- def recvfrom(self, addr, buflen=1024, flags=0):
+ def recvfrom(self, buflen=1024, flags=0):
self._checkClosed()
if self._sslobj:
raise ValueError("recvfrom not allowed on instances of %s" %
self.__class__)
else:
- return socket.recvfrom(self, addr, buflen, flags)
+ return socket.recvfrom(self, buflen, flags)
def recvfrom_into(self, buffer, nbytes=None, flags=0):
self._checkClosed()
del ss
self.assertEqual(wr(), None)
+ def test_wrapped_unconnected(self):
+ # Methods on an unconnected SSLSocket propagate the original
+ # socket.error raise by the underlying socket object.
+ s = socket.socket(socket.AF_INET)
+ ss = ssl.wrap_socket(s)
+ self.assertRaises(socket.error, ss.recv, 1)
+ self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
+ self.assertRaises(socket.error, ss.recvfrom, 1)
+ self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
+ self.assertRaises(socket.error, ss.send, b'x')
+ self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
+
def test_timeout(self):
# Issue #8524: when creating an SSL socket, the timeout of the
# original socket should be retained.