needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
+def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
+ cert_reqs=ssl.CERT_NONE, ca_certs=None,
+ ciphers=None, certfile=None, keyfile=None,
+ **kwargs):
+ context = ssl.SSLContext(ssl_version)
+ if cert_reqs is not None:
+ context.verify_mode = cert_reqs
+ if ca_certs is not None:
+ context.load_verify_locations(ca_certs)
+ if certfile is not None or keyfile is not None:
+ context.load_cert_chain(certfile, keyfile)
+ if ciphers is not None:
+ context.set_ciphers(ciphers)
+ return context.wrap_socket(sock, **kwargs)
+
class BasicSocketTests(unittest.TestCase):
def test_constants(self):
# Issue #7943: an SSL object doesn't create reference cycles with
# itself.
s = socket.socket(socket.AF_INET)
- ss = ssl.wrap_socket(s)
+ ss = test_wrap_socket(s)
wr = weakref.ref(ss)
with support.check_warnings(("", ResourceWarning)):
del ss
# Methods on an unconnected SSLSocket propagate the original
# OSError raise by the underlying socket object.
s = socket.socket(socket.AF_INET)
- with ssl.wrap_socket(s) as ss:
+ with test_wrap_socket(s) as ss:
self.assertRaises(OSError, ss.recv, 1)
self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
self.assertRaises(OSError, ss.recvfrom, 1)
for timeout in (None, 0.0, 5.0):
s = socket.socket(socket.AF_INET)
s.settimeout(timeout)
- with ssl.wrap_socket(s) as ss:
+ with test_wrap_socket(s) as ss:
self.assertEqual(timeout, ss.gettimeout())
- def test_errors(self):
+ def test_errors_sslwrap(self):
sock = socket.socket()
self.assertRaisesRegex(ValueError,
"certfile must be specified",
ssl.wrap_socket, sock, server_side=True)
self.assertRaisesRegex(ValueError,
"certfile must be specified for server-side operations",
- ssl.wrap_socket, sock, server_side=True, certfile="")
+ ssl.wrap_socket, sock, server_side=True, certfile="")
with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
- s.connect, (HOST, 8080))
+ s.connect, (HOST, 8080))
with self.assertRaises(OSError) as cm:
with socket.socket() as sock:
ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
sock = socket.socket()
self.addCleanup(sock.close)
with self.assertRaises(ssl.SSLError):
- ssl.wrap_socket(sock,
+ test_wrap_socket(sock,
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
s.listen()
c = socket.socket(socket.AF_INET)
c.connect(s.getsockname())
- with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
+ with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
with self.assertRaises(ValueError):
ss.get_channel_binding("unknown-type")
s.close()
def test_tls_unique_channel_binding(self):
# unconnected should return None for known type
s = socket.socket(socket.AF_INET)
- with ssl.wrap_socket(s) as ss:
+ with test_wrap_socket(s) as ss:
self.assertIsNone(ss.get_channel_binding("tls-unique"))
# the same for server-side
s = socket.socket(socket.AF_INET)
- with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
+ with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
self.assertIsNone(ss.get_channel_binding("tls-unique"))
def test_dealloc_warn(self):
- ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
+ ss = test_wrap_socket(socket.socket(socket.AF_INET))
r = repr(ss)
with self.assertWarns(ResourceWarning) as cm:
ss = None
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
with self.assertRaises(NotImplementedError) as cx:
- ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
+ test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
self.assertEqual(str(cx.exception), "only stream sockets are supported")
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
with self.assertRaises(NotImplementedError) as cx:
server = socket.socket(socket.AF_INET)
self.addCleanup(server.close)
port = support.bind_port(server) # Reserve port but don't listen
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
self.addCleanup(s.close)
rc = s.connect_ex((HOST, port))
self.addCleanup(server.__exit__, None, None, None)
def test_connect(self):
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ with test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE) as s:
s.connect(self.server_addr)
self.assertEqual({}, s.getpeercert())
# this should succeed because we specify the root cert
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ with test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SIGNING_CA) as s:
s.connect(self.server_addr)
# This should fail because we have no verification certs. Connection
# failure crashes ThreadedEchoServer, so run this in an independent
# test method.
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
self.addCleanup(s.close)
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
def test_connect_ex(self):
# Issue #11326: check connect_ex() implementation
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SIGNING_CA)
self.addCleanup(s.close)
def test_non_blocking_connect_ex(self):
# Issue #11326: non-blocking connect_ex() should allow handshake
# to proceed after the socket gets ready.
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SIGNING_CA,
do_handshake_on_connect=False)
# Issue #5238: creating a file-like object with makefile() shouldn't
# delay closing the underlying "real socket" (here tested with its
# file descriptor, hence skipping the test under Windows).
- ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
+ ss = test_wrap_socket(socket.socket(socket.AF_INET))
ss.connect(self.server_addr)
fd = ss.fileno()
f = ss.makefile()
s = socket.socket(socket.AF_INET)
s.connect(self.server_addr)
s.setblocking(False)
- s = ssl.wrap_socket(s,
+ s = test_wrap_socket(s,
cert_reqs=ssl.CERT_NONE,
do_handshake_on_connect=False)
self.addCleanup(s.close)
_test_get_server_certificate_fail(self, *self.server_addr)
def test_ciphers(self):
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ with test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
s.connect(self.server_addr)
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ with test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
s.connect(self.server_addr)
# Error checking can happen at instantiation or when connecting
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
with socket.socket(socket.AF_INET) as sock:
- s = ssl.wrap_socket(sock,
+ s = test_wrap_socket(sock,
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
s.connect(self.server_addr)
# Issue #12065: on a timeout, connect_ex() should return the original
# errno (mimicking the behaviour of non-SSL sockets).
with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
do_handshake_on_connect=False)
self.addCleanup(s.close)
class ConnectionHandler (asyncore.dispatcher_with_send):
def __init__(self, conn, certfile):
- self.socket = ssl.wrap_socket(conn, server_side=True,
+ self.socket = test_wrap_socket(conn, server_side=True,
certfile=certfile,
do_handshake_on_connect=False)
asyncore.dispatcher_with_send.__init__(self, self.socket)
connectionchatty=False)
with server, \
socket.socket() as sock, \
- ssl.wrap_socket(sock,
+ test_wrap_socket(sock,
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1) as s:
try:
c.connect((HOST, port))
listener_gone.wait()
try:
- ssl_sock = ssl.wrap_socket(c)
+ ssl_sock = test_wrap_socket(c)
except OSError:
pass
else:
sys.stdout.write(
" client: read %r from server, starting TLS...\n"
% msg)
- conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
+ conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
wrapped = True
elif indata == b"ENDTLS" and msg.startswith(b"ok"):
# ENDTLS ok, switch back to clear text
indata = b"FOO\n"
server = AsyncoreEchoServer(CERTFILE)
with server:
- s = ssl.wrap_socket(socket.socket())
+ s = test_wrap_socket(socket.socket())
s.connect(('127.0.0.1', server.port))
if support.verbose:
sys.stdout.write(
chatty=True,
connectionchatty=False)
with server:
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
self.addCleanup(server.__exit__, None, None)
s = socket.create_connection((HOST, server.port))
self.addCleanup(s.close)
- s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
+ s = test_wrap_socket(s, suppress_ragged_eofs=False)
self.addCleanup(s.close)
# recv/read(0) should return no data
chatty=True,
connectionchatty=False)
with server:
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
c.connect((host, port))
# Will attempt handshake and time out
self.assertRaisesRegex(socket.timeout, "timed out",
- ssl.wrap_socket, c)
+ test_wrap_socket, c)
finally:
c.close()
try:
c = socket.socket(socket.AF_INET)
- c = ssl.wrap_socket(c)
+ c = test_wrap_socket(c)
c.settimeout(0.2)
# Will attempt handshake and time out
self.assertRaisesRegex(socket.timeout, "timed out",
chatty=True,
connectionchatty=False)
with server:
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
s.close()
# now, again
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,