]> granicus.if.org Git - python/commitdiff
Issue #17778: Fix test discovery for test_multiprocessing. (Patch by
authorRichard Oudkerk <shibturn@gmail.com>
Tue, 16 Jul 2013 14:33:41 +0000 (15:33 +0100)
committerRichard Oudkerk <shibturn@gmail.com>
Tue, 16 Jul 2013 14:33:41 +0000 (15:33 +0100)
Zachary Ware.)

Lib/test/test_multiprocessing.py
Misc/NEWS

index baeb3b9798eb761d2bc818c47fe1514ce1ae61b8..17abadebef66745428754d235376ae40717aa45c 100644 (file)
@@ -19,6 +19,7 @@ import socket
 import random
 import logging
 import struct
+import operator
 import test.support
 import test.script_helper
 
@@ -1624,6 +1625,18 @@ def mul(x, y):
 
 class _TestPool(BaseTestCase):
 
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.pool = cls.Pool(4)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.pool.terminate()
+        cls.pool.join()
+        cls.pool = None
+        super().tearDownClass()
+
     def test_apply(self):
         papply = self.pool.apply
         self.assertEqual(papply(sqr, (5,)), sqr(5))
@@ -1715,15 +1728,6 @@ class _TestPool(BaseTestCase):
         p.join()
 
     def test_terminate(self):
-        if self.TYPE == 'manager':
-            # On Unix a forked process increfs each shared object to
-            # which its parent process held a reference.  If the
-            # forked process gets terminated then there is likely to
-            # be a reference leak.  So to prevent
-            # _TestZZZNumberOfObjects from failing we skip this test
-            # when using a manager.
-            return
-
         result = self.pool.map_async(
             time.sleep, [0.1 for i in range(10000)], chunksize=1
             )
@@ -1751,7 +1755,6 @@ class _TestPool(BaseTestCase):
             with multiprocessing.Pool(2) as p:
                 r = p.map_async(sqr, L)
                 self.assertEqual(r.get(), expected)
-            print(p._state)
             self.assertRaises(ValueError, p.map_async, sqr, L)
 
 def raising():
@@ -1845,35 +1848,6 @@ class _TestPoolWorkerLifetime(BaseTestCase):
         for (j, res) in enumerate(results):
             self.assertEqual(res.get(), sqr(j))
 
-
-#
-# Test that manager has expected number of shared objects left
-#
-
-class _TestZZZNumberOfObjects(BaseTestCase):
-    # Because test cases are sorted alphabetically, this one will get
-    # run after all the other tests for the manager.  It tests that
-    # there have been no "reference leaks" for the manager's shared
-    # objects.  Note the comment in _TestPool.test_terminate().
-
-    # If some other test using ManagerMixin.manager fails, then the
-    # raised exception may keep alive a frame which holds a reference
-    # to a managed object.  This will cause test_number_of_objects to
-    # also fail.
-    ALLOWED_TYPES = ('manager',)
-
-    def test_number_of_objects(self):
-        EXPECTED_NUMBER = 1                # the pool object is still alive
-        multiprocessing.active_children()  # discard dead process objs
-        gc.collect()                       # do garbage collection
-        refs = self.manager._number_of_objects()
-        debug_info = self.manager._debug_info()
-        if refs != EXPECTED_NUMBER:
-            print(self.manager._debug_info())
-            print(debug_info)
-
-        self.assertEqual(refs, EXPECTED_NUMBER)
-
 #
 # Test of creating a customized manager class
 #
@@ -2051,7 +2025,7 @@ class _TestManagerRestart(BaseTestCase):
             address=addr, authkey=authkey, serializer=SERIALIZER)
         try:
             manager.start()
-        except IOError as e:
+        except OSError as e:
             if e.errno != errno.EADDRINUSE:
                 raise
             # Retry after some time, in case the old socket was lingering
@@ -2165,9 +2139,9 @@ class _TestConnection(BaseTestCase):
             self.assertEqual(reader.writable, False)
             self.assertEqual(writer.readable, False)
             self.assertEqual(writer.writable, True)
-            self.assertRaises(IOError, reader.send, 2)
-            self.assertRaises(IOError, writer.recv)
-            self.assertRaises(IOError, writer.poll)
+            self.assertRaises(OSError, reader.send, 2)
+            self.assertRaises(OSError, writer.recv)
+            self.assertRaises(OSError, writer.poll)
 
     def test_spawn_close(self):
         # We test that a pipe connection can be closed by parent
@@ -2329,8 +2303,8 @@ class _TestConnection(BaseTestCase):
         if self.TYPE == 'processes':
             self.assertTrue(a.closed)
             self.assertTrue(b.closed)
-            self.assertRaises(IOError, a.recv)
-            self.assertRaises(IOError, b.recv)
+            self.assertRaises(OSError, a.recv)
+            self.assertRaises(OSError, b.recv)
 
 class _TestListener(BaseTestCase):
 
@@ -2351,7 +2325,7 @@ class _TestListener(BaseTestCase):
                     self.assertEqual(d.recv(), 1729)
 
         if self.TYPE == 'processes':
-            self.assertRaises(IOError, l.accept)
+            self.assertRaises(OSError, l.accept)
 
 class _TestListenerClient(BaseTestCase):
 
@@ -2401,7 +2375,7 @@ class _TestListenerClient(BaseTestCase):
             c.close()
             l.close()
 
-class _TestPoll(unittest.TestCase):
+class _TestPoll(BaseTestCase):
 
     ALLOWED_TYPES = ('processes', 'threads')
 
@@ -2942,27 +2916,18 @@ class TestInvalidHandle(unittest.TestCase):
     def test_invalid_handles(self):
         conn = multiprocessing.connection.Connection(44977608)
         try:
-            self.assertRaises((ValueError, IOError), conn.poll)
+            self.assertRaises((ValueError, OSError), conn.poll)
         finally:
             # Hack private attribute _handle to avoid printing an error
             # in conn.__del__
             conn._handle = None
-        self.assertRaises((ValueError, IOError),
+        self.assertRaises((ValueError, OSError),
                           multiprocessing.connection.Connection, -1)
 
 #
 # Functions used to create test cases from the base ones in this module
 #
 
-def get_attributes(Source, names):
-    d = {}
-    for name in names:
-        obj = getattr(Source, name)
-        if type(obj) == type(get_attributes):
-            obj = staticmethod(obj)
-        d[name] = obj
-    return d
-
 def create_test_cases(Mixin, type):
     result = {}
     glob = globals()
@@ -2975,10 +2940,10 @@ def create_test_cases(Mixin, type):
             assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
             if type in base.ALLOWED_TYPES:
                 newname = 'With' + Type + name[1:]
-                class Temp(base, unittest.TestCase, Mixin):
+                class Temp(base, Mixin, unittest.TestCase):
                     pass
                 result[newname] = Temp
-                Temp.__name__ = newname
+                Temp.__name__ = Temp.__qualname__ = newname
                 Temp.__module__ = Mixin.__module__
     return result
 
@@ -2989,12 +2954,24 @@ def create_test_cases(Mixin, type):
 class ProcessesMixin(object):
     TYPE = 'processes'
     Process = multiprocessing.Process
-    locals().update(get_attributes(multiprocessing, (
-        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
-        'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
-        'RawArray', 'current_process', 'active_children', 'Pipe',
-        'connection', 'JoinableQueue', 'Pool'
-        )))
+    connection = multiprocessing.connection
+    current_process = staticmethod(multiprocessing.current_process)
+    active_children = staticmethod(multiprocessing.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.Pipe)
+    Queue = staticmethod(multiprocessing.Queue)
+    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+    Lock = staticmethod(multiprocessing.Lock)
+    RLock = staticmethod(multiprocessing.RLock)
+    Semaphore = staticmethod(multiprocessing.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.Condition)
+    Event = staticmethod(multiprocessing.Event)
+    Barrier = staticmethod(multiprocessing.Barrier)
+    Value = staticmethod(multiprocessing.Value)
+    Array = staticmethod(multiprocessing.Array)
+    RawValue = staticmethod(multiprocessing.RawValue)
+    RawArray = staticmethod(multiprocessing.RawArray)
 
 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
 globals().update(testcases_processes)
@@ -3003,12 +2980,48 @@ globals().update(testcases_processes)
 class ManagerMixin(object):
     TYPE = 'manager'
     Process = multiprocessing.Process
-    manager = object.__new__(multiprocessing.managers.SyncManager)
-    locals().update(get_attributes(manager, (
-        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
-        'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
-        'Namespace', 'JoinableQueue', 'Pool'
-        )))
+    Queue = property(operator.attrgetter('manager.Queue'))
+    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+    Lock = property(operator.attrgetter('manager.Lock'))
+    RLock = property(operator.attrgetter('manager.RLock'))
+    Semaphore = property(operator.attrgetter('manager.Semaphore'))
+    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+    Condition = property(operator.attrgetter('manager.Condition'))
+    Event = property(operator.attrgetter('manager.Event'))
+    Barrier = property(operator.attrgetter('manager.Barrier'))
+    Value = property(operator.attrgetter('manager.Value'))
+    Array = property(operator.attrgetter('manager.Array'))
+    list = property(operator.attrgetter('manager.list'))
+    dict = property(operator.attrgetter('manager.dict'))
+    Namespace = property(operator.attrgetter('manager.Namespace'))
+
+    @classmethod
+    def Pool(cls, *args, **kwds):
+        return cls.manager.Pool(*args, **kwds)
+
+    @classmethod
+    def setUpClass(cls):
+        cls.manager = multiprocessing.Manager()
+
+    @classmethod
+    def tearDownClass(cls):
+        # only the manager process should be returned by active_children()
+        # but this can take a bit on slow machines, so wait a few seconds
+        # if there are other children too (see #17395)
+        t = 0.01
+        while len(multiprocessing.active_children()) > 1 and t < 5:
+            time.sleep(t)
+            t *= 2
+        gc.collect()                       # do garbage collection
+        if cls.manager._number_of_objects() != 0:
+            # This is not really an error since some tests do not
+            # ensure that all processes which hold a reference to a
+            # managed object have been joined.
+            print('Shared objects which still exist at manager shutdown:')
+            print(cls.manager._debug_info())
+        cls.manager.shutdown()
+        cls.manager.join()
+        cls.manager = None
 
 testcases_manager = create_test_cases(ManagerMixin, type='manager')
 globals().update(testcases_manager)
@@ -3017,16 +3030,27 @@ globals().update(testcases_manager)
 class ThreadsMixin(object):
     TYPE = 'threads'
     Process = multiprocessing.dummy.Process
-    locals().update(get_attributes(multiprocessing.dummy, (
-        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
-        'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
-        'active_children', 'Pipe', 'connection', 'dict', 'list',
-        'Namespace', 'JoinableQueue', 'Pool'
-        )))
+    connection = multiprocessing.dummy.connection
+    current_process = staticmethod(multiprocessing.dummy.current_process)
+    active_children = staticmethod(multiprocessing.dummy.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.dummy.Pipe)
+    Queue = staticmethod(multiprocessing.dummy.Queue)
+    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+    Lock = staticmethod(multiprocessing.dummy.Lock)
+    RLock = staticmethod(multiprocessing.dummy.RLock)
+    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.dummy.Condition)
+    Event = staticmethod(multiprocessing.dummy.Event)
+    Barrier = staticmethod(multiprocessing.dummy.Barrier)
+    Value = staticmethod(multiprocessing.dummy.Value)
+    Array = staticmethod(multiprocessing.dummy.Array)
 
 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
 globals().update(testcases_threads)
 
+
 class OtherTest(unittest.TestCase):
     # TODO: add more tests for deliver/answer challenge.
     def test_deliver_challenge_auth_failure(self):
@@ -3532,16 +3556,7 @@ class TestIgnoreEINTR(unittest.TestCase):
 #
 #
 
-testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
-                   TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
-                   TestFlags, TestTimeouts, TestNoForkBomb,
-                   TestForkAwareThreadLock, TestIgnoreEINTR]
-
-#
-#
-#
-
-def test_main(run=None):
+def setUpModule():
     if sys.platform.startswith("linux"):
         try:
             lock = multiprocessing.RLock()
@@ -3550,43 +3565,10 @@ def test_main(run=None):
 
     check_enough_semaphores()
 
-    if run is None:
-        from test.support import run_unittest as run
-
     util.get_temp_dir()     # creates temp directory for use by all processes
 
     multiprocessing.get_logger().setLevel(LOG_LEVEL)
 
-    ProcessesMixin.pool = multiprocessing.Pool(4)
-    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
-    ManagerMixin.manager.__init__()
-    ManagerMixin.manager.start()
-    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
-
-    testcases = (
-        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
-        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
-        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
-        testcases_other
-        )
-
-    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
-    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
-    try:
-        run(suite)
-    finally:
-        ThreadsMixin.pool.terminate()
-        ProcessesMixin.pool.terminate()
-        ManagerMixin.pool.terminate()
-        ManagerMixin.pool.join()
-        ManagerMixin.manager.shutdown()
-        ManagerMixin.manager.join()
-        ThreadsMixin.pool.join()
-        ProcessesMixin.pool.join()
-        del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
-
-def main():
-    test_main(unittest.TextTestRunner(verbosity=2).run)
 
 if __name__ == '__main__':
-    main()
+    unittest.main()
index 5bfa69641edc21c4f3612b8a69278cdfa6df8f7c..9bb80e94b5356b60575e461f881ea7621187041a 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -49,6 +49,9 @@ Core and Builtins
 Library
 -------
 
+- Issue #17778: Fix test discovery for test_multiprocessing. (Patch by
+  Zachary Ware.)
+
 - Issue #18431: The new email header parser now decodes RFC2047 encoded words
   in structured headers.