]> granicus.if.org Git - python/commitdiff
Rewrite test_socketserver as unittest, written for GHOP by Benjamin Petersen.
authorGeorg Brandl <georg@python.org>
Sat, 2 Feb 2008 11:05:00 +0000 (11:05 +0000)
committerGeorg Brandl <georg@python.org>
Sat, 2 Feb 2008 11:05:00 +0000 (11:05 +0000)
Lib/test/test_socketserver.py

index dd094489da06d91f78e2d761671d48bc9ed44b13..b66eea2282da9062456c297edf1f373e0c6bac03 100644 (file)
@@ -1,20 +1,32 @@
-# Test suite for SocketServer.py
+"""
+Test suite for SocketServer.py.
+"""
 
-from test import test_support
-from test.test_support import (verbose, verify, TESTFN, TestSkipped,
-                               reap_children)
-test_support.requires('network')
-
-from SocketServer import *
+import os
 import socket
 import errno
+import imp
 import select
 import time
 import threading
-import os
+from functools import wraps
+import unittest
+import SocketServer
+
+import test.test_support
+from test.test_support import reap_children, verbose, TestSkipped
+from test.test_support import TESTFN as TEST_FILE
+
+test.test_support.requires("network")
 
 NREQ = 3
 DELAY = 0.5
+TEST_STR = "hello world\n"
+HOST = "localhost"
+
+HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
+HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
+
 
 class MyMixinHandler:
     def handle(self):
@@ -23,50 +35,41 @@ class MyMixinHandler:
         time.sleep(DELAY)
         self.wfile.write(line)
 
-class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
+
+def receive(sock, n, timeout=20):
+    r, w, x = select.select([sock], [], [], timeout)
+    if sock in r:
+        return sock.recv(n)
+    else:
+        raise RuntimeError, "timed out on %r" % (sock,)
+
+
+class MyStreamHandler(MyMixinHandler, SocketServer.StreamRequestHandler):
     pass
 
-class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
+class MyDatagramHandler(MyMixinHandler,
+    SocketServer.DatagramRequestHandler):
     pass
 
+class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
+                              SocketServer.UnixStreamServer):
+    pass
+
+class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
+                                SocketServer.UnixDatagramServer):
+    pass
+
+
 class MyMixinServer:
     def serve_a_few(self):
         for i in range(NREQ):
             self.handle_request()
+
     def handle_error(self, request, client_address):
         self.close_request(request)
         self.server_close()
         raise
 
-teststring = "hello world\n"
-
-def receive(sock, n, timeout=20):
-    r, w, x = select.select([sock], [], [], timeout)
-    if sock in r:
-        return sock.recv(n)
-    else:
-        raise RuntimeError, "timed out on %r" % (sock,)
-
-def testdgram(proto, addr):
-    s = socket.socket(proto, socket.SOCK_DGRAM)
-    s.sendto(teststring, addr)
-    buf = data = receive(s, 100)
-    while data and '\n' not in buf:
-        data = receive(s, 100)
-        buf += data
-    verify(buf == teststring)
-    s.close()
-
-def teststream(proto, addr):
-    s = socket.socket(proto, socket.SOCK_STREAM)
-    s.connect(addr)
-    s.sendall(teststring)
-    buf = data = receive(s, 100)
-    while data and '\n' not in buf:
-        data = receive(s, 100)
-        buf += data
-    verify(buf == teststring)
-    s.close()
 
 class ServerThread(threading.Thread):
     def __init__(self, addr, svrcls, hdlrcls):
@@ -75,6 +78,7 @@ class ServerThread(threading.Thread):
         self.__svrcls = svrcls
         self.__hdlrcls = hdlrcls
         self.ready = threading.Event()
+
     def run(self):
         class svrcls(MyMixinServer, self.__svrcls):
             pass
@@ -93,64 +97,8 @@ class ServerThread(threading.Thread):
         svr.serve_a_few()
         if verbose: print "thread: done"
 
-seed = 0
-def pickport():
-    global seed
-    seed += 1
-    return 10000 + (os.getpid() % 1000)*10 + seed
-
-host = "localhost"
-testfiles = []
-def pickaddr(proto):
-    if proto == socket.AF_INET:
-        return (host, pickport())
-    else:
-        fn = TESTFN + str(pickport())
-        if os.name == 'os2':
-            # AF_UNIX socket names on OS/2 require a specific prefix
-            # which can't include a drive letter and must also use
-            # backslashes as directory separators
-            if fn[1] == ':':
-                fn = fn[2:]
-            if fn[0] in (os.sep, os.altsep):
-                fn = fn[1:]
-            fn = os.path.join('\socket', fn)
-            if os.sep == '/':
-                fn = fn.replace(os.sep, os.altsep)
-            else:
-                fn = fn.replace(os.altsep, os.sep)
-        testfiles.append(fn)
-        return fn
-
-def cleanup():
-    for fn in testfiles:
-        try:
-            os.remove(fn)
-        except os.error:
-            pass
-    testfiles[:] = []
-
-def testloop(proto, servers, hdlrcls, testfunc):
-    for svrcls in servers:
-        addr = pickaddr(proto)
-        if verbose:
-            print "ADDR =", addr
-            print "CLASS =", svrcls
-        t = ServerThread(addr, svrcls, hdlrcls)
-        if verbose: print "server created"
-        t.start()
-        if verbose: print "server running"
-        for i in range(NREQ):
-            t.ready.wait(10*DELAY)
-            if not t.ready.isSet():
-                raise RuntimeError("Server not ready within a reasonable time")
-            if verbose: print "test client", i
-            testfunc(proto, addr)
-        if verbose: print "waiting for server"
-        t.join()
-        if verbose: print "done"
-
-class ForgivingTCPServer(TCPServer):
+
+class ForgivingTCPServer(SocketServer.TCPServer):
     # prevent errors if another process is using the port we want
     def server_bind(self):
         host, default_port = self.server_address
@@ -160,64 +108,147 @@ class ForgivingTCPServer(TCPServer):
         for port in [default_port, 3434, 8798, 23833]:
             try:
                 self.server_address = host, port
-                TCPServer.server_bind(self)
+                SocketServer.TCPServer.server_bind(self)
                 break
             except socket.error, (err, msg):
                 if err != errno.EADDRINUSE:
                     raise
-                print >>sys.__stderr__, \
-                    '  WARNING: failed to listen on port %d, trying another' % port
-
-tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
-if hasattr(os, 'fork') and os.name not in ('os2',):
-    tcpservers.append(ForkingTCPServer)
-udpservers = [UDPServer, ThreadingUDPServer]
-if hasattr(os, 'fork') and os.name not in ('os2',):
-    udpservers.append(ForkingUDPServer)
-
-if not hasattr(socket, 'AF_UNIX'):
-    streamservers = []
-    dgramservers = []
-else:
-    class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
-    streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
-    if hasattr(os, 'fork') and os.name not in ('os2',):
-        streamservers.append(ForkingUnixStreamServer)
-    class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
-    dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
-    if hasattr(os, 'fork') and os.name not in ('os2',):
-        dgramservers.append(ForkingUnixDatagramServer)
-
-def sloppy_cleanup():
-    # See http://python.org/sf/1540386
-    # We need to reap children here otherwise a child from one server
-    # can be left running for the next server and cause a test failure.
-    time.sleep(DELAY)
-    reap_children()
-
-def testall():
-    testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
-    sloppy_cleanup()
-    testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
-    if hasattr(socket, 'AF_UNIX'):
-        sloppy_cleanup()
-        testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
-        # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
-        # client address so this cannot work:
-        ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
+                print >> sys.__stderr__, \
+                    "WARNING: failed to listen on port %d, trying another: " % port
+
+
+class SocketServerTest(unittest.TestCase):
+    """Test all socket servers."""
+
+    def setUp(self):
+        self.port_seed = 0
+        self.test_files = []
+
+    def tearDown(self):
+        reap_children()
+
+        for fn in self.test_files:
+            try:
+                os.remove(fn)
+            except os.error:
+                pass
+        self.test_files[:] = []
+
+    def pickport(self):
+        self.port_seed += 1
+        return 10000 + (os.getpid() % 1000)*10 + self.port_seed
+
+    def pickaddr(self, proto):
+        if proto == socket.AF_INET:
+            return (HOST, self.pickport())
+        else:
+            fn = TEST_FILE + str(self.pickport())
+            if os.name == 'os2':
+                # AF_UNIX socket names on OS/2 require a specific prefix
+                # which can't include a drive letter and must also use
+                # backslashes as directory separators
+                if fn[1] == ':':
+                    fn = fn[2:]
+                if fn[0] in (os.sep, os.altsep):
+                    fn = fn[1:]
+                fn = os.path.join('\socket', fn)
+                if os.sep == '/':
+                    fn = fn.replace(os.sep, os.altsep)
+                else:
+                    fn = fn.replace(os.altsep, os.sep)
+            self.test_files.append(fn)
+            return fn
+
+    def run_servers(self, proto, servers, hdlrcls, testfunc):
+        for svrcls in servers:
+            addr = self.pickaddr(proto)
+            if verbose:
+                print "ADDR =", addr
+                print "CLASS =", svrcls
+            t = ServerThread(addr, svrcls, hdlrcls)
+            if verbose: print "server created"
+            t.start()
+            if verbose: print "server running"
+            for i in range(NREQ):
+                t.ready.wait(10*DELAY)
+                self.assert_(t.ready.isSet(),
+                    "Server not ready within a reasonable time")
+                if verbose: print "test client", i
+                testfunc(proto, addr)
+            if verbose: print "waiting for server"
+            t.join()
+            if verbose: print "done"
+
+    def stream_examine(self, proto, addr):
+        s = socket.socket(proto, socket.SOCK_STREAM)
+        s.connect(addr)
+        s.sendall(TEST_STR)
+        buf = data = receive(s, 100)
+        while data and '\n' not in buf:
+            data = receive(s, 100)
+            buf += data
+        self.assertEquals(buf, TEST_STR)
+        s.close()
+
+    def dgram_examine(self, proto, addr):
+        s = socket.socket(proto, socket.SOCK_DGRAM)
+        s.sendto(TEST_STR, addr)
+        buf = data = receive(s, 100)
+        while data and '\n' not in buf:
+            data = receive(s, 100)
+            buf += data
+        self.assertEquals(buf, TEST_STR)
+        s.close()
+
+    def test_TCPServers(self):
+        # Test SocketServer.TCPServer
+        servers = [ForgivingTCPServer, SocketServer.ThreadingTCPServer]
+        if HAVE_FORKING:
+            servers.append(SocketServer.ForkingTCPServer)
+        self.run_servers(socket.AF_INET, servers,
+                         MyStreamHandler, self.stream_examine)
+
+    def test_UDPServers(self):
+        # Test SocketServer.UPDServer
+        servers = [SocketServer.UDPServer,
+                   SocketServer.ThreadingUDPServer]
+        if HAVE_FORKING:
+            servers.append(SocketServer.ForkingUDPServer)
+        self.run_servers(socket.AF_INET, servers, MyDatagramHandler,
+                         self.dgram_examine)
+
+    def test_stream_servers(self):
+        # Test SocketServer's stream servers
+        if not HAVE_UNIX_SOCKETS:
+            return
+        servers = [SocketServer.UnixStreamServer,
+                   SocketServer.ThreadingUnixStreamServer]
+        if HAVE_FORKING:
+            servers.append(ForkingUnixStreamServer)
+        self.run_servers(socket.AF_UNIX, servers, MyStreamHandler,
+                         self.stream_examine)
+
+    # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
+    # client address so this cannot work:
+
+    # def test_dgram_servers(self):
+    #     # Test SocketServer.UnixDatagramServer
+    #     if not HAVE_UNIX_SOCKETS:
+    #         return
+    #     servers = [SocketServer.UnixDatagramServer,
+    #                SocketServer.ThreadingUnixDatagramServer]
+    #     if HAVE_FORKING:
+    #         servers.append(ForkingUnixDatagramServer)
+    #     self.run_servers(socket.AF_UNIX, servers, MyDatagramHandler,
+    #                      self.dgram_examine)
+
 
 def test_main():
-    import imp
     if imp.lock_held():
-        # If the import lock is held, the threads will hang.
+        # If the import lock is held, the threads will hang
         raise TestSkipped("can't run when import lock is held")
 
-    reap_children()
-    try:
-        testall()
-    finally:
-        cleanup()
-    reap_children()
+    test.test_support.run_unittest(SocketServerTest)
 
 if __name__ == "__main__":
     test_main()