]> granicus.if.org Git - python/commitdiff
Added a SSL server to test_socket_ssl.py to be able to test
authorFacundo Batista <facundobatista@gmail.com>
Tue, 3 Apr 2007 17:29:48 +0000 (17:29 +0000)
committerFacundo Batista <facundobatista@gmail.com>
Tue, 3 Apr 2007 17:29:48 +0000 (17:29 +0000)
locally. Now, it checks if have openssl available and run
those specific tests (it starts openssl at the beggining of
all the tests and then kills it at the end).

Lib/test/ssl_cert.pem [new file with mode: 0644]
Lib/test/ssl_key.pem [new file with mode: 0644]
Lib/test/test_socket_ssl.py

diff --git a/Lib/test/ssl_cert.pem b/Lib/test/ssl_cert.pem
new file mode 100644 (file)
index 0000000..9d7ac23
--- /dev/null
@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICLDCCAdYCAQAwDQYJKoZIhvcNAQEEBQAwgaAxCzAJBgNVBAYTAlBUMRMwEQYD
+VQQIEwpRdWVlbnNsYW5kMQ8wDQYDVQQHEwZMaXNib2ExFzAVBgNVBAoTDk5ldXJv
+bmlvLCBMZGEuMRgwFgYDVQQLEw9EZXNlbnZvbHZpbWVudG8xGzAZBgNVBAMTEmJy
+dXR1cy5uZXVyb25pby5wdDEbMBkGCSqGSIb3DQEJARYMc2FtcG9AaWtpLmZpMB4X
+DTk2MDkwNTAzNDI0M1oXDTk2MTAwNTAzNDI0M1owgaAxCzAJBgNVBAYTAlBUMRMw
+EQYDVQQIEwpRdWVlbnNsYW5kMQ8wDQYDVQQHEwZMaXNib2ExFzAVBgNVBAoTDk5l
+dXJvbmlvLCBMZGEuMRgwFgYDVQQLEw9EZXNlbnZvbHZpbWVudG8xGzAZBgNVBAMT
+EmJydXR1cy5uZXVyb25pby5wdDEbMBkGCSqGSIb3DQEJARYMc2FtcG9AaWtpLmZp
+MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAL7+aty3S1iBA/+yxjxv4q1MUTd1kjNw
+L4lYKbpzzlmC5beaQXeQ2RmGMTXU+mDvuqItjVHOK3DvPK7lTcSGftUCAwEAATAN
+BgkqhkiG9w0BAQQFAANBAFqPEKFjk6T6CKTHvaQeEAsX0/8YHPHqH/9AnhSjrwuX
+9EBc0n6bVGhN7XaXd6sJ7dym9sbsWxb+pJdurnkxjx4=
+-----END CERTIFICATE-----
diff --git a/Lib/test/ssl_key.pem b/Lib/test/ssl_key.pem
new file mode 100644 (file)
index 0000000..239ad66
--- /dev/null
@@ -0,0 +1,9 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIBPAIBAAJBAL7+aty3S1iBA/+yxjxv4q1MUTd1kjNwL4lYKbpzzlmC5beaQXeQ
+2RmGMTXU+mDvuqItjVHOK3DvPK7lTcSGftUCAwEAAQJBALjkK+jc2+iihI98riEF
+oudmkNziSRTYjnwjx8mCoAjPWviB3c742eO3FG4/soi1jD9A5alihEOXfUzloenr
+8IECIQD3B5+0l+68BA/6d76iUNqAAV8djGTzvxnCxycnxPQydQIhAMXt4trUI3nc
+a+U8YL2HPFA3gmhBsSICbq2OptOCnM7hAiEA6Xi3JIQECob8YwkRj29DU3/4WYD7
+WLPgsQpwo1GuSpECICGsnWH5oaeD9t9jbFoSfhJvv0IZmxdcLpRcpslpeWBBAiEA
+6/5B8J0GHdJq89FHwEG/H2eVVUYu5y/aD6sgcm+0Avg=
+-----END RSA PRIVATE KEY-----
index 4c502d1d2481395bd23e0ee4fc3feb4aa73ab04f..86b194377a24a5fd270fc8975c7dec7faea7894b 100644 (file)
@@ -8,6 +8,9 @@ import errno
 import threading
 import subprocess
 import time
+import ctypes
+import os
+import urllib
 
 # Optionally test SSL support, if we have it in the tested platform
 skip_expected = not hasattr(socket, "ssl")
@@ -15,11 +18,6 @@ skip_expected = not hasattr(socket, "ssl")
 class ConnectedTests(unittest.TestCase):
 
     def testBasic(self):
-        import urllib
-    
-        if test_support.verbose:
-            print "test_basic ..."
-    
         socket.RAND_status()
         try:
             socket.RAND_egd(1)
@@ -42,9 +40,6 @@ class ConnectedTests(unittest.TestCase):
         hoped for.  If this message is seen often, test_timeout should be
         changed to use a more reliable address.""" % (ADDR, extra_msg)
     
-        if test_support.verbose:
-            print "test_timeout ..."
-    
         # A service which issues a welcome banner (without need to write
         # anything).
         # XXX ("gmail.org", 995) has been unreliable so far, from time to
@@ -75,9 +70,6 @@ class ConnectedTests(unittest.TestCase):
 class BasicTests(unittest.TestCase):
 
     def testRudeShutdown(self):
-        if test_support.verbose:
-            print "test_rude_shutdown ..."
-    
         # Some random port to connect to.
         PORT = [9934]
     
@@ -115,7 +107,68 @@ class BasicTests(unittest.TestCase):
         connector()
         t.join()
 
+class OpenSSLTests(unittest.TestCase):
+
+    def testBasic(self):
+        time.sleep(.2)
+        s = socket.socket()
+        s.connect(("localhost", 4433))
+        ss = socket.ssl(s)
+        ss.write("Foo\n")
+        i = ss.read(4)
+        self.assertEqual(i, "Foo\n")
+
+
+def haveOpenSSL():
+    try:
+        s = subprocess.Popen("openssl rand 1".split(), stdout=subprocess.PIPE)
+        s.stdout.read(1)
+    except OSError, err:
+        if err.errno == 2:
+            return False
+        raise
+    return True
+
+class OpenSSLServer(threading.Thread):
+    def __init__(self):
+        self.s = None
+        self.keepServing = True
+        threading.Thread.__init__(self)
 
+    def run(self):
+        if os.access("ssl_cert.pem", os.F_OK):
+            cert_file = "ssl_cert.pem"
+        elif os.access("./Lib/test/ssl_cert.pem", os.F_OK):
+            cert_file = "./Lib/test/ssl_cert.pem"
+        else:
+            raise ValueError("No cert file found!")
+        if os.access("ssl_key.pem", os.F_OK):
+            key_file = "ssl_key.pem"
+        elif os.access("./Lib/test/ssl_key.pem", os.F_OK):
+            key_file = "./Lib/test/ssl_key.pem"
+        else:
+            raise ValueError("No cert file found!")
+
+        cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file)
+        self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, 
+                                       stdout=subprocess.PIPE, 
+                                       stderr=subprocess.STDOUT)
+        while self.keepServing:
+            time.sleep(.5)
+            l = self.s.stdout.readline()
+            self.s.stdin.write(l)
+
+    def shutdown(self):
+        self.keepServing = False
+        if not self.s:
+            return
+        if sys.platform == "win32":
+            handle = ctypes.windll.kernel32.OpenProcess(1, False, self.s.pid)
+            ctypes.windll.kernel32.TerminateProcess(handle, -1)
+            ctypes.windll.kernel32.CloseHandle(handle)
+        else:
+            os.kill(self.s.pid, 15)
 def test_main():
     if not hasattr(socket, "ssl"):
         raise test_support.TestSkipped("socket module has no ssl support")
@@ -125,9 +178,25 @@ def test_main():
     if test_support.is_resource_enabled('network'):
         tests.append(ConnectedTests)
 
+    # in these platforms we can kill the openssl process
+    if sys.platform in ("sunos5", "darwin", "linux1",
+                        "linux2", "win32", "hp-ux11"):
+        if haveOpenSSL():
+            haveServer = True
+            tests.append(OpenSSLTests)
+        else:
+            haveServer = False
+
+    if haveServer:
+        server = OpenSSLServer()
+        server.start()
+
     thread_info = test_support.threading_setup()
     test_support.run_unittest(*tests)
     test_support.threading_cleanup(*thread_info)
 
+    if haveServer:
+        server.shutdown()
+
 if __name__ == "__main__":
     test_main()