+++ /dev/null
-# Test just the SSL support in the socket module, in a moderately bogus way.
-import sys
-import unittest
-from test import test_support
-import socket
-import errno
-import threading
-import subprocess
-import time
-import os
-import urllib
-import warnings
- 'ignore',
- 'socket.ssl.. is deprecated. Use ssl.wrap_socket.. instead.',
- DeprecationWarning)
-# Optionally test SSL support, if we have it in the tested platform
-skip_expected = not hasattr(socket, "ssl")
-HOST = test_support.HOST
-class ConnectedTests(unittest.TestCase):
- def urlopen(self, host, *args, **kwargs):
- # Connecting to remote hosts is flaky. Make it more robust
- # by retrying the connection several times.
- for i in range(3):
- try:
- return urllib.urlopen(host, *args, **kwargs)
- except IOError, e:
- last_exc = e
- continue
- except:
- raise
- raise last_exc
- def testBasic(self):
- socket.RAND_status()
- try:
- socket.RAND_egd(1)
- except TypeError:
- pass
- else:
- print "didn't raise TypeError"
- socket.RAND_add("this is a random string", 75.0)
- with test_support.transient_internet():
- f = self.urlopen('https://sourceforge.net')
- buf = f.read()
- f.close()
- def testTimeout(self):
- def error_msg(extra_msg):
- print >> sys.stderr, """\
- WARNING: an attempt to connect to %r %s, in
- testTimeout. That may be legitimate, but is not the outcome we
- hoped for. If this message is seen often, testTimeout should be
- changed to use a more reliable address.""" % (ADDR, extra_msg)
- # A service which issues a welcome banner (without need to write
- # anything).
- ADDR = "pop.gmail.com", 995
- s = socket.socket()
- s.settimeout(30.0)
- try:
- s.connect(ADDR)
- except socket.timeout:
- error_msg('timed out')
- return
- except socket.error, exc: # In case connection is refused.
- if exc.args[0] == errno.ECONNREFUSED:
- error_msg('was refused')
- return
- else:
- raise
- ss = socket.ssl(s)
- # Read part of return welcome banner twice.
- ss.read(1)
- ss.read(1)
- s.close()
-class BasicTests(unittest.TestCase):
- def testRudeShutdown(self):
- listener_ready = threading.Event()
- listener_gone = threading.Event()
- sock = socket.socket()
- port = test_support.bind_port(sock)
- # `listener` runs in a thread. It opens a socket and sits in accept()
- # until the main thread connects. Then it rudely closes the socket,
- # and sets Event `listener_gone` to let the main thread know the socket
- # is gone.
- def listener(s):
- s.listen(5)
- listener_ready.set()
- s.accept()
- s = None # reclaim the socket object, which also closes it
- listener_gone.set()
- def connector():
- listener_ready.wait()
- s = socket.socket()
- s.connect((HOST, port))
- listener_gone.wait()
- try:
- ssl_sock = socket.ssl(s)
- except socket.sslerror:
- pass
- else:
- raise test_support.TestFailed(
- 'connecting to closed SSL socket should have failed')
- t = threading.Thread(target=listener, args=(sock,))
- t.start()
- connector()
- t.join()
- def connect(self, s, host_port):
- # Connecting to remote hosts is flaky. Make it more robust
- # by retrying the connection several times.
- for i in range(3):
- try:
- return s.connect(host_port)
- except IOError, e:
- last_exc = e
- continue
- except:
- raise
- raise last_exc
- def test_978833(self):
- if not test_support.is_resource_enabled("network"):
- return
- if test_support.verbose:
- print "test_978833 ..."
- import os, httplib, ssl
- with test_support.transient_internet():
- s = socket.socket(socket.AF_INET)
- try:
- self.connect(s, ("svn.python.org", 443))
- except IOError:
- print >> sys.stderr, """\
- WARNING: an attempt to connect to svn.python.org:443 failed, in
- test_978833. That may be legitimate, but is not the outcome we
- hoped for. If this message is seen often, test_978833 should be
- changed to use a more reliable address."""
- return
- fd = s._sock.fileno()
- sock = ssl.wrap_socket(s)
- s = None
- sock.close()
- try:
- os.fstat(fd)
- except OSError:
- pass
- else:
- raise test_support.TestFailed("Failed to close socket")
-class OpenSSLTests(unittest.TestCase):
- def testBasic(self):
- s = socket.socket()
- s.connect((HOST, OpenSSLServer.PORT))
- ss = socket.ssl(s)
- ss.write("Foo\n")
- i = ss.read(4)
- self.assertEqual(i, "Foo\n")
- s.close()
- def testMethods(self):
- # read & write is already tried in the Basic test
- # now we'll try to get the server info about certificates
- # this came from the certificate I used, one I found in /usr/share/openssl
- info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi"
- s = socket.socket()
- s.connect((HOST, OpenSSLServer.PORT))
- ss = socket.ssl(s)
- cert = ss.server()
- self.assertEqual(cert, info)
- cert = ss.issuer()
- self.assertEqual(cert, info)
- s.close()
-class OpenSSLServer(threading.Thread):
- PORT = None
- def __init__(self):
- self.s = None
- self.keepServing = True
- self._external()
- if self.haveServer:
- threading.Thread.__init__(self)
- def _external(self):
- # let's find the .pem files
- curdir = os.path.dirname(__file__) or os.curdir
- cert_file = os.path.join(curdir, "ssl_cert.pem")
- if not os.access(cert_file, os.F_OK):
- raise ValueError("No cert file found! (tried %r)" % cert_file)
- key_file = os.path.join(curdir, "ssl_key.pem")
- if not os.access(key_file, os.F_OK):
- raise ValueError("No key file found! (tried %r)" % key_file)
- try:
- # XXX TODO: on Windows, this should make more effort to use the
- # openssl.exe that would have been built by the pcbuild.sln.
- OpenSSLServer.PORT = test_support.find_unused_port()
- args = (OpenSSLServer.PORT, cert_file, key_file)
- cmd = "openssl s_server -accept %d -cert %s -key %s -quiet" % args
- self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- time.sleep(1)
- except:
- self.haveServer = False
- else:
- # let's try if it is actually up
- try:
- s = socket.socket()
- s.connect((HOST, OpenSSLServer.PORT))
- s.close()
- if self.s.stdout.readline() != "ERROR\n":
- raise ValueError
- except:
- self.haveServer = False
- else:
- self.haveServer = True
- def run(self):
- while self.keepServing:
- time.sleep(.5)
- l = self.s.stdout.readline()
- self.s.stdin.write(l)
- def shutdown(self):
- self.keepServing = False
- if not self.s:
- return
- if sys.platform == "win32":
- subprocess.TerminateProcess(int(self.s._handle), -1)
- else:
- os.kill(self.s.pid, 15)
-def test_main():
- if not hasattr(socket, "ssl"):
- raise test_support.TestSkipped("socket module has no ssl support")
- tests = [BasicTests]
- if test_support.is_resource_enabled('network'):
- tests.append(ConnectedTests)
- # in these platforms we can kill the openssl process
- if sys.platform in ("sunos5", "darwin", "linux1",
- "linux2", "win32", "hp-ux11"):
- server = OpenSSLServer()
- if server.haveServer:
- tests.append(OpenSSLTests)
- server.start()
- else:
- server = None
- thread_info = test_support.threading_setup()
- try:
- test_support.run_unittest(*tests)
- finally:
- if server is not None and server.haveServer:
- server.shutdown()
- test_support.threading_cleanup(*thread_info)
-if __name__ == "__main__":
- test_main()