--- /dev/null
+-----BEGIN CERTIFICATE-----
+MIICLDCCAdYCAQAwDQYJKoZIhvcNAQEEBQAwgaAxCzAJBgNVBAYTAlBUMRMwEQYD
+VQQIEwpRdWVlbnNsYW5kMQ8wDQYDVQQHEwZMaXNib2ExFzAVBgNVBAoTDk5ldXJv
+bmlvLCBMZGEuMRgwFgYDVQQLEw9EZXNlbnZvbHZpbWVudG8xGzAZBgNVBAMTEmJy
+dXR1cy5uZXVyb25pby5wdDEbMBkGCSqGSIb3DQEJARYMc2FtcG9AaWtpLmZpMB4X
+DTk2MDkwNTAzNDI0M1oXDTk2MTAwNTAzNDI0M1owgaAxCzAJBgNVBAYTAlBUMRMw
+EQYDVQQIEwpRdWVlbnNsYW5kMQ8wDQYDVQQHEwZMaXNib2ExFzAVBgNVBAoTDk5l
+dXJvbmlvLCBMZGEuMRgwFgYDVQQLEw9EZXNlbnZvbHZpbWVudG8xGzAZBgNVBAMT
+EmJydXR1cy5uZXVyb25pby5wdDEbMBkGCSqGSIb3DQEJARYMc2FtcG9AaWtpLmZp
+MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAL7+aty3S1iBA/+yxjxv4q1MUTd1kjNw
+L4lYKbpzzlmC5beaQXeQ2RmGMTXU+mDvuqItjVHOK3DvPK7lTcSGftUCAwEAATAN
+BgkqhkiG9w0BAQQFAANBAFqPEKFjk6T6CKTHvaQeEAsX0/8YHPHqH/9AnhSjrwuX
+9EBc0n6bVGhN7XaXd6sJ7dym9sbsWxb+pJdurnkxjx4=
+-----END CERTIFICATE-----
import threading
import subprocess
import time
+import ctypes
+import os
+import urllib
# Optionally test SSL support, if we have it in the tested platform
skip_expected = not hasattr(socket, "ssl")
class ConnectedTests(unittest.TestCase):
def testBasic(self):
- import urllib
-
- if test_support.verbose:
- print "test_basic ..."
-
socket.RAND_status()
try:
socket.RAND_egd(1)
hoped for. If this message is seen often, test_timeout should be
changed to use a more reliable address.""" % (ADDR, extra_msg)
- if test_support.verbose:
- print "test_timeout ..."
-
# A service which issues a welcome banner (without need to write
# anything).
# XXX ("gmail.org", 995) has been unreliable so far, from time to
class BasicTests(unittest.TestCase):
def testRudeShutdown(self):
- if test_support.verbose:
- print "test_rude_shutdown ..."
-
# Some random port to connect to.
PORT = [9934]
connector()
t.join()
+class OpenSSLTests(unittest.TestCase):
+
+ def testBasic(self):
+ time.sleep(.2)
+ s = socket.socket()
+ s.connect(("localhost", 4433))
+ ss = socket.ssl(s)
+ ss.write("Foo\n")
+ i = ss.read(4)
+ self.assertEqual(i, "Foo\n")
+
+
+def haveOpenSSL():
+ try:
+ s = subprocess.Popen("openssl rand 1".split(), stdout=subprocess.PIPE)
+ s.stdout.read(1)
+ except OSError, err:
+ if err.errno == 2:
+ return False
+ raise
+ return True
+
+class OpenSSLServer(threading.Thread):
+ def __init__(self):
+ self.s = None
+ self.keepServing = True
+ threading.Thread.__init__(self)
+ def run(self):
+ if os.access("ssl_cert.pem", os.F_OK):
+ cert_file = "ssl_cert.pem"
+ elif os.access("./Lib/test/ssl_cert.pem", os.F_OK):
+ cert_file = "./Lib/test/ssl_cert.pem"
+ else:
+ raise ValueError("No cert file found!")
+ if os.access("ssl_key.pem", os.F_OK):
+ key_file = "ssl_key.pem"
+ elif os.access("./Lib/test/ssl_key.pem", os.F_OK):
+ key_file = "./Lib/test/ssl_key.pem"
+ else:
+ raise ValueError("No cert file found!")
+
+ cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file)
+ self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ while self.keepServing:
+ time.sleep(.5)
+ l = self.s.stdout.readline()
+ self.s.stdin.write(l)
+
+ def shutdown(self):
+ self.keepServing = False
+ if not self.s:
+ return
+ if sys.platform == "win32":
+ handle = ctypes.windll.kernel32.OpenProcess(1, False, self.s.pid)
+ ctypes.windll.kernel32.TerminateProcess(handle, -1)
+ ctypes.windll.kernel32.CloseHandle(handle)
+ else:
+ os.kill(self.s.pid, 15)
+
def test_main():
if not hasattr(socket, "ssl"):
raise test_support.TestSkipped("socket module has no ssl support")
if test_support.is_resource_enabled('network'):
tests.append(ConnectedTests)
+ # in these platforms we can kill the openssl process
+ if sys.platform in ("sunos5", "darwin", "linux1",
+ "linux2", "win32", "hp-ux11"):
+ if haveOpenSSL():
+ haveServer = True
+ tests.append(OpenSSLTests)
+ else:
+ haveServer = False
+
+ if haveServer:
+ server = OpenSSLServer()
+ server.start()
+
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)
test_support.threading_cleanup(*thread_info)
+ if haveServer:
+ server.shutdown()
+
if __name__ == "__main__":
test_main()