]> granicus.if.org Git - python/commitdiff
Fix/optimize test_asyncore.test_quick_connect() (#1188)
authorVictor Stinner <victor.stinner@gmail.com>
Wed, 19 Apr 2017 21:42:46 +0000 (23:42 +0200)
committerGitHub <noreply@github.com>
Wed, 19 Apr 2017 21:42:46 +0000 (23:42 +0200)
Don't use addCleanup() in test_quick_connect() because it keeps the
Thread object alive and so @reap_threads fails on its timeout of 1
second. "./python -m test -v test_asyncore -m test_quick_connect"
now takes 185 ms, instead of 11 seconds.

Other minor changes:

* Use "with sock:" to close the socket instead of
  try/finally: sock.close()
* Use self.skipTest() in test_quick_connect() to remove one
  indentation level and notice user that the test is specific to
  AF_INET and AF_INET6

Lib/test/test_asyncore.py

index d05462b7efce3582c5d5f8511910eef3d9a637da..270e9ccba6bd789bc3fb510a76a68a7486f3e917 100644 (file)
@@ -755,50 +755,49 @@ class BaseTestAPI:
     def test_set_reuse_addr(self):
         if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
             self.skipTest("Not applicable to AF_UNIX sockets.")
-        sock = socket.socket(self.family)
-        try:
-            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        except OSError:
-            unittest.skip("SO_REUSEADDR not supported on this platform")
-        else:
-            # if SO_REUSEADDR succeeded for sock we expect asyncore
-            # to do the same
-            s = asyncore.dispatcher(socket.socket(self.family))
-            self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
-                                                 socket.SO_REUSEADDR))
-            s.socket.close()
-            s.create_socket(self.family)
-            s.set_reuse_addr()
-            self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
-                                                 socket.SO_REUSEADDR))
-        finally:
-            sock.close()
+
+        with socket.socket(self.family) as sock:
+            try:
+                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            except OSError:
+                unittest.skip("SO_REUSEADDR not supported on this platform")
+            else:
+                # if SO_REUSEADDR succeeded for sock we expect asyncore
+                # to do the same
+                s = asyncore.dispatcher(socket.socket(self.family))
+                self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
+                                                     socket.SO_REUSEADDR))
+                s.socket.close()
+                s.create_socket(self.family)
+                s.set_reuse_addr()
+                self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
+                                                     socket.SO_REUSEADDR))
 
     @unittest.skipUnless(threading, 'Threading required for this test.')
     @support.reap_threads
     def test_quick_connect(self):
         # see: http://bugs.python.org/issue10340
-        if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
-            server = BaseServer(self.family, self.addr)
-            t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
-                                                              count=500))
-            t.start()
-            def cleanup():
-                t.join(timeout=TIMEOUT)
-                if t.is_alive():
-                    self.fail("join() timed out")
-            self.addCleanup(cleanup)
-
-            s = socket.socket(self.family, socket.SOCK_STREAM)
-            s.settimeout(.2)
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
-                         struct.pack('ii', 1, 0))
-            try:
-                s.connect(server.address)
-            except OSError:
-                pass
-            finally:
-                s.close()
+        if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
+            self.skipTest("test specific to AF_INET and AF_INET6")
+
+        server = BaseServer(self.family, self.addr)
+        t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
+                                                          count=500), name="ident")
+        t.start()
+        try:
+            with socket.socket(self.family, socket.SOCK_STREAM) as s:
+                s.settimeout(.2)
+                s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+                             struct.pack('ii', 1, 0))
+
+                try:
+                    s.connect(server.address)
+                except OSError:
+                    pass
+        finally:
+            t.join(timeout=TIMEOUT)
+            if t.is_alive():
+                self.fail("join() timed out")
 
 class TestAPI_UseIPv4Sockets(BaseTestAPI):
     family = socket.AF_INET