self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
s.connect, (HOST, 8080))
with self.assertRaises(IOError) as cm:
- ssl.wrap_socket(socket.socket(), certfile=WRONGCERT)
+ with socket.socket() as sock:
+ ssl.wrap_socket(sock, certfile=WRONGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaises(IOError) as cm:
- ssl.wrap_socket(socket.socket(), certfile=CERTFILE, keyfile=WRONGCERT)
+ with socket.socket() as sock:
+ ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaises(IOError) as cm:
- ssl.wrap_socket(socket.socket(), certfile=WRONGCERT, keyfile=WRONGCERT)
+ with socket.socket() as sock:
+ ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
def test_match_hostname(self):
def test_server_side(self):
# server_hostname doesn't work for server sockets
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sock = socket.socket()
- self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
- server_hostname="some.hostname")
+ with socket.socket() as sock:
+ self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
+ server_hostname="some.hostname")
class ContextTests(unittest.TestCase):
s.connect(remote)
# Error checking can happen at instantiation or when connecting
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
- s.connect(remote)
+ with socket.socket(socket.AF_INET) as sock:
+ s = ssl.wrap_socket(sock,
+ cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
+ s.connect(remote)
def test_algorithms(self):
# Issue #8484: all algorithms should be available when verifying a
# try to connect
try:
try:
- s = ssl.wrap_socket(socket.socket(),
- certfile=certfile,
- ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect((HOST, server.port))
+ with socket.socket() as sock:
+ s = ssl.wrap_socket(sock,
+ certfile=certfile,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
if support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x.args[1])
def listener():
s.listen(5)
listener_ready.set()
- s.accept()
+ newsock, addr = s.accept()
+ newsock.close()
s.close()
listener_gone.set()
def connector():
listener_ready.wait()
- c = socket.socket()
- c.connect((HOST, port))
- listener_gone.wait()
- try:
- ssl_sock = ssl.wrap_socket(c)
- except IOError:
- pass
- else:
- self.fail('connecting to closed SSL socket should have failed')
+ with socket.socket() as c:
+ c.connect((HOST, port))
+ listener_gone.wait()
+ try:
+ ssl_sock = ssl.wrap_socket(c)
+ except IOError:
+ pass
+ else:
+ self.fail('connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
# Let the socket hang around rather than having
# it closed by garbage collection.
conns.append(server.accept()[0])
+ for sock in conns:
+ sock.close()
t = threading.Thread(target=serve)
t.start()