]> granicus.if.org Git - python/commitdiff
Merged revisions 76137,76172 via svnmerge from
authorAntoine Pitrou <solipsis@pitrou.net>
Mon, 9 Nov 2009 16:47:50 +0000 (16:47 +0000)
committerAntoine Pitrou <solipsis@pitrou.net>
Mon, 9 Nov 2009 16:47:50 +0000 (16:47 +0000)
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines

  Issue #7270: Add some dedicated unit tests for multi-thread synchronization
  primitives such as Lock, RLock, Condition, Event and Semaphore.
........
  r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines

  Issue #7282: Fix a memory leak when an RLock was used in a thread other
  than those started through `threading.Thread` (for example, using
  `thread.start_new_thread()`.
........

Lib/test/lock_tests.py [new file with mode: 0644]
Lib/test/test_thread.py
Lib/test/test_threading.py
Lib/threading.py
Misc/NEWS

diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
new file mode 100644 (file)
index 0000000..855ad46
--- /dev/null
@@ -0,0 +1,550 @@
+"""
+Various tests for synchronization primitives.
+"""
+
+import sys
+import time
+from thread import start_new_thread, get_ident
+import threading
+import unittest
+
+from test import test_support as support
+
+
+def _wait():
+    # A crude wait/yield function not relying on synchronization primitives.
+    time.sleep(0.01)
+
+class Bunch(object):
+    """
+    A bunch of threads.
+    """
+    def __init__(self, f, n, wait_before_exit=False):
+        """
+        Construct a bunch of `n` threads running the same function `f`.
+        If `wait_before_exit` is True, the threads won't terminate until
+        do_finish() is called.
+        """
+        self.f = f
+        self.n = n
+        self.started = []
+        self.finished = []
+        self._can_exit = not wait_before_exit
+        def task():
+            tid = get_ident()
+            self.started.append(tid)
+            try:
+                f()
+            finally:
+                self.finished.append(tid)
+                while not self._can_exit:
+                    _wait()
+        for i in range(n):
+            start_new_thread(task, ())
+
+    def wait_for_started(self):
+        while len(self.started) < self.n:
+            _wait()
+
+    def wait_for_finished(self):
+        while len(self.finished) < self.n:
+            _wait()
+
+    def do_finish(self):
+        self._can_exit = True
+
+
+class BaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self._threads = support.threading_setup()
+
+    def tearDown(self):
+        support.threading_cleanup(*self._threads)
+        support.reap_children()
+
+
+class BaseLockTests(BaseTestCase):
+    """
+    Tests for both recursive and non-recursive locks.
+    """
+
+    def test_constructor(self):
+        lock = self.locktype()
+        del lock
+
+    def test_acquire_destroy(self):
+        lock = self.locktype()
+        lock.acquire()
+        del lock
+
+    def test_acquire_release(self):
+        lock = self.locktype()
+        lock.acquire()
+        lock.release()
+        del lock
+
+    def test_try_acquire(self):
+        lock = self.locktype()
+        self.assertTrue(lock.acquire(False))
+        lock.release()
+
+    def test_try_acquire_contended(self):
+        lock = self.locktype()
+        lock.acquire()
+        result = []
+        def f():
+            result.append(lock.acquire(False))
+        Bunch(f, 1).wait_for_finished()
+        self.assertFalse(result[0])
+        lock.release()
+
+    def test_acquire_contended(self):
+        lock = self.locktype()
+        lock.acquire()
+        N = 5
+        def f():
+            lock.acquire()
+            lock.release()
+
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(len(b.finished), 0)
+        lock.release()
+        b.wait_for_finished()
+        self.assertEqual(len(b.finished), N)
+
+    def test_with(self):
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+            lock.release()
+        def _with(err=None):
+            with lock:
+                if err is not None:
+                    raise err
+        _with()
+        # Check the lock is unacquired
+        Bunch(f, 1).wait_for_finished()
+        self.assertRaises(TypeError, _with, TypeError)
+        # Check the lock is unacquired
+        Bunch(f, 1).wait_for_finished()
+
+    def test_thread_leak(self):
+        # The lock shouldn't leak a Thread instance when used from a foreign
+        # (non-threading) thread.
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+            lock.release()
+        n = len(threading.enumerate())
+        # We run many threads in the hope that existing threads ids won't
+        # be recycled.
+        Bunch(f, 15).wait_for_finished()
+        self.assertEqual(n, len(threading.enumerate()))
+
+
+class LockTests(BaseLockTests):
+    """
+    Tests for non-recursive, weak locks
+    (which can be acquired and released from different threads).
+    """
+    def test_reacquire(self):
+        # Lock needs to be released before re-acquiring.
+        lock = self.locktype()
+        phase = []
+        def f():
+            lock.acquire()
+            phase.append(None)
+            lock.acquire()
+            phase.append(None)
+        start_new_thread(f, ())
+        while len(phase) == 0:
+            _wait()
+        _wait()
+        self.assertEqual(len(phase), 1)
+        lock.release()
+        while len(phase) == 1:
+            _wait()
+        self.assertEqual(len(phase), 2)
+
+    def test_different_thread(self):
+        # Lock can be released from a different thread.
+        lock = self.locktype()
+        lock.acquire()
+        def f():
+            lock.release()
+        b = Bunch(f, 1)
+        b.wait_for_finished()
+        lock.acquire()
+        lock.release()
+
+
+class RLockTests(BaseLockTests):
+    """
+    Tests for recursive locks.
+    """
+    def test_reacquire(self):
+        lock = self.locktype()
+        lock.acquire()
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+        lock.release()
+
+    def test_release_unacquired(self):
+        # Cannot release an unacquired lock
+        lock = self.locktype()
+        self.assertRaises(RuntimeError, lock.release)
+        lock.acquire()
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+        lock.release()
+        self.assertRaises(RuntimeError, lock.release)
+
+    def test_different_thread(self):
+        # Cannot release from a different thread
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+        b = Bunch(f, 1, True)
+        try:
+            self.assertRaises(RuntimeError, lock.release)
+        finally:
+            b.do_finish()
+
+    def test__is_owned(self):
+        lock = self.locktype()
+        self.assertFalse(lock._is_owned())
+        lock.acquire()
+        self.assertTrue(lock._is_owned())
+        lock.acquire()
+        self.assertTrue(lock._is_owned())
+        result = []
+        def f():
+            result.append(lock._is_owned())
+        Bunch(f, 1).wait_for_finished()
+        self.assertFalse(result[0])
+        lock.release()
+        self.assertTrue(lock._is_owned())
+        lock.release()
+        self.assertFalse(lock._is_owned())
+
+
+class EventTests(BaseTestCase):
+    """
+    Tests for Event objects.
+    """
+
+    def test_is_set(self):
+        evt = self.eventtype()
+        self.assertFalse(evt.is_set())
+        evt.set()
+        self.assertTrue(evt.is_set())
+        evt.set()
+        self.assertTrue(evt.is_set())
+        evt.clear()
+        self.assertFalse(evt.is_set())
+        evt.clear()
+        self.assertFalse(evt.is_set())
+
+    def _check_notify(self, evt):
+        # All threads get notified
+        N = 5
+        results1 = []
+        results2 = []
+        def f():
+            evt.wait()
+            results1.append(evt.is_set())
+            evt.wait()
+            results2.append(evt.is_set())
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(len(results1), 0)
+        evt.set()
+        b.wait_for_finished()
+        self.assertEqual(results1, [True] * N)
+        self.assertEqual(results2, [True] * N)
+
+    def test_notify(self):
+        evt = self.eventtype()
+        self._check_notify(evt)
+        # Another time, after an explicit clear()
+        evt.set()
+        evt.clear()
+        self._check_notify(evt)
+
+    def test_timeout(self):
+        evt = self.eventtype()
+        results1 = []
+        results2 = []
+        N = 5
+        def f():
+            evt.wait(0.0)
+            results1.append(evt.is_set())
+            t1 = time.time()
+            evt.wait(0.2)
+            r = evt.is_set()
+            t2 = time.time()
+            results2.append((r, t2 - t1))
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(results1, [False] * N)
+        for r, dt in results2:
+            self.assertFalse(r)
+            self.assertTrue(dt >= 0.2, dt)
+        # The event is set
+        results1 = []
+        results2 = []
+        evt.set()
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(results1, [True] * N)
+        for r, dt in results2:
+            self.assertTrue(r)
+
+
+class ConditionTests(BaseTestCase):
+    """
+    Tests for condition variables.
+    """
+
+    def test_acquire(self):
+        cond = self.condtype()
+        # Be default we have an RLock: the condition can be acquired multiple
+        # times.
+        cond.acquire()
+        cond.acquire()
+        cond.release()
+        cond.release()
+        lock = threading.Lock()
+        cond = self.condtype(lock)
+        cond.acquire()
+        self.assertFalse(lock.acquire(False))
+        cond.release()
+        self.assertTrue(lock.acquire(False))
+        self.assertFalse(cond.acquire(False))
+        lock.release()
+        with cond:
+            self.assertFalse(lock.acquire(False))
+
+    def test_unacquired_wait(self):
+        cond = self.condtype()
+        self.assertRaises(RuntimeError, cond.wait)
+
+    def test_unacquired_notify(self):
+        cond = self.condtype()
+        self.assertRaises(RuntimeError, cond.notify)
+
+    def _check_notify(self, cond):
+        N = 5
+        results1 = []
+        results2 = []
+        phase_num = 0
+        def f():
+            cond.acquire()
+            cond.wait()
+            cond.release()
+            results1.append(phase_num)
+            cond.acquire()
+            cond.wait()
+            cond.release()
+            results2.append(phase_num)
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(results1, [])
+        # Notify 3 threads at first
+        cond.acquire()
+        cond.notify(3)
+        _wait()
+        phase_num = 1
+        cond.release()
+        while len(results1) < 3:
+            _wait()
+        self.assertEqual(results1, [1] * 3)
+        self.assertEqual(results2, [])
+        # Notify 5 threads: they might be in their first or second wait
+        cond.acquire()
+        cond.notify(5)
+        _wait()
+        phase_num = 2
+        cond.release()
+        while len(results1) + len(results2) < 8:
+            _wait()
+        self.assertEqual(results1, [1] * 3 + [2] * 2)
+        self.assertEqual(results2, [2] * 3)
+        # Notify all threads: they are all in their second wait
+        cond.acquire()
+        cond.notify_all()
+        _wait()
+        phase_num = 3
+        cond.release()
+        while len(results2) < 5:
+            _wait()
+        self.assertEqual(results1, [1] * 3 + [2] * 2)
+        self.assertEqual(results2, [2] * 3 + [3] * 2)
+        b.wait_for_finished()
+
+    def test_notify(self):
+        cond = self.condtype()
+        self._check_notify(cond)
+        # A second time, to check internal state is still ok.
+        self._check_notify(cond)
+
+    def test_timeout(self):
+        cond = self.condtype()
+        results = []
+        N = 5
+        def f():
+            cond.acquire()
+            t1 = time.time()
+            cond.wait(0.2)
+            t2 = time.time()
+            cond.release()
+            results.append(t2 - t1)
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(len(results), 5)
+        for dt in results:
+            self.assertTrue(dt >= 0.2, dt)
+
+
+class BaseSemaphoreTests(BaseTestCase):
+    """
+    Common tests for {bounded, unbounded} semaphore objects.
+    """
+
+    def test_constructor(self):
+        self.assertRaises(ValueError, self.semtype, value = -1)
+        self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
+
+    def test_acquire(self):
+        sem = self.semtype(1)
+        sem.acquire()
+        sem.release()
+        sem = self.semtype(2)
+        sem.acquire()
+        sem.acquire()
+        sem.release()
+        sem.release()
+
+    def test_acquire_destroy(self):
+        sem = self.semtype()
+        sem.acquire()
+        del sem
+
+    def test_acquire_contended(self):
+        sem = self.semtype(7)
+        sem.acquire()
+        N = 10
+        results1 = []
+        results2 = []
+        phase_num = 0
+        def f():
+            sem.acquire()
+            results1.append(phase_num)
+            sem.acquire()
+            results2.append(phase_num)
+        b = Bunch(f, 10)
+        b.wait_for_started()
+        while len(results1) + len(results2) < 6:
+            _wait()
+        self.assertEqual(results1 + results2, [0] * 6)
+        phase_num = 1
+        for i in range(7):
+            sem.release()
+        while len(results1) + len(results2) < 13:
+            _wait()
+        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+        phase_num = 2
+        for i in range(6):
+            sem.release()
+        while len(results1) + len(results2) < 19:
+            _wait()
+        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+        # The semaphore is still locked
+        self.assertFalse(sem.acquire(False))
+        # Final release, to let the last thread finish
+        sem.release()
+        b.wait_for_finished()
+
+    def test_try_acquire(self):
+        sem = self.semtype(2)
+        self.assertTrue(sem.acquire(False))
+        self.assertTrue(sem.acquire(False))
+        self.assertFalse(sem.acquire(False))
+        sem.release()
+        self.assertTrue(sem.acquire(False))
+
+    def test_try_acquire_contended(self):
+        sem = self.semtype(4)
+        sem.acquire()
+        results = []
+        def f():
+            results.append(sem.acquire(False))
+            results.append(sem.acquire(False))
+        Bunch(f, 5).wait_for_finished()
+        # There can be a thread switch between acquiring the semaphore and
+        # appending the result, therefore results will not necessarily be
+        # ordered.
+        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
+
+    def test_default_value(self):
+        # The default initial value is 1.
+        sem = self.semtype()
+        sem.acquire()
+        def f():
+            sem.acquire()
+            sem.release()
+        b = Bunch(f, 1)
+        b.wait_for_started()
+        _wait()
+        self.assertFalse(b.finished)
+        sem.release()
+        b.wait_for_finished()
+
+    def test_with(self):
+        sem = self.semtype(2)
+        def _with(err=None):
+            with sem:
+                self.assertTrue(sem.acquire(False))
+                sem.release()
+                with sem:
+                    self.assertFalse(sem.acquire(False))
+                    if err:
+                        raise err
+        _with()
+        self.assertTrue(sem.acquire(False))
+        sem.release()
+        self.assertRaises(TypeError, _with, TypeError)
+        self.assertTrue(sem.acquire(False))
+        sem.release()
+
+class SemaphoreTests(BaseSemaphoreTests):
+    """
+    Tests for unbounded semaphores.
+    """
+
+    def test_release_unacquired(self):
+        # Unbounded releases are allowed and increment the semaphore's value
+        sem = self.semtype(1)
+        sem.release()
+        sem.acquire()
+        sem.acquire()
+        sem.release()
+
+
+class BoundedSemaphoreTests(BaseSemaphoreTests):
+    """
+    Tests for bounded semaphores.
+    """
+
+    def test_release_unacquired(self):
+        # Cannot go past the initial value
+        sem = self.semtype()
+        self.assertRaises(ValueError, sem.release)
+        sem.acquire()
+        sem.release()
+        self.assertRaises(ValueError, sem.release)
index 66ad22f25adb1154bd6234e991be9eff04992469..8bbfdb11edbfd6dc7160c7def16daa2aea4962f1 100644 (file)
@@ -5,6 +5,7 @@ from test import test_support
 import thread
 import time
 
+from test import lock_tests
 
 NUMTASKS = 10
 NUMTRIPS = 3
@@ -164,8 +165,12 @@ class BarrierTest(BasicThreadTest):
             self.done_mutex.release()
 
 
+class LockTests(lock_tests.LockTests):
+    locktype = thread.allocate_lock
+
+
 def test_main():
-    test_support.run_unittest(ThreadRunningTests, BarrierTest)
+    test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
 
 if __name__ == "__main__":
     test_main()
index 04e37a786bcc261f87b84d8962246bea0dc9823a..edf74de1cc7066cd16644d87f1fd76d337e5e3b8 100644 (file)
@@ -11,6 +11,8 @@ import time
 import unittest
 import weakref
 
+from test import lock_tests
+
 # A trivial mutable counter.
 class Counter(object):
     def __init__(self):
@@ -132,11 +134,9 @@ class ThreadTests(unittest.TestCase):
     def test_foreign_thread(self):
         # Check that a "foreign" thread can use the threading module.
         def f(mutex):
-            # Acquiring an RLock forces an entry for the foreign
+            # Calling current_thread() forces an entry for the foreign
             # thread to get made in the threading._active map.
-            r = threading.RLock()
-            r.acquire()
-            r.release()
+            threading.current_thread()
             mutex.release()
 
         mutex = threading.Lock()
@@ -453,22 +453,6 @@ class ThreadingExceptionTests(unittest.TestCase):
         thread.start()
         self.assertRaises(RuntimeError, thread.start)
 
-    def test_releasing_unacquired_rlock(self):
-        rlock = threading.RLock()
-        self.assertRaises(RuntimeError, rlock.release)
-
-    def test_waiting_on_unacquired_condition(self):
-        cond = threading.Condition()
-        self.assertRaises(RuntimeError, cond.wait)
-
-    def test_notify_on_unacquired_condition(self):
-        cond = threading.Condition()
-        self.assertRaises(RuntimeError, cond.notify)
-
-    def test_semaphore_with_negative_value(self):
-        self.assertRaises(ValueError, threading.Semaphore, value = -1)
-        self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxint)
-
     def test_joining_current_thread(self):
         current_thread = threading.current_thread()
         self.assertRaises(RuntimeError, current_thread.join);
@@ -483,8 +467,34 @@ class ThreadingExceptionTests(unittest.TestCase):
         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
 
 
+class LockTests(lock_tests.LockTests):
+    locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+    locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+    eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+    # An Condition uses an RLock by default and exports its API.
+    locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+    condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+    semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+    semtype = staticmethod(threading.BoundedSemaphore)
+
+
 def test_main():
-    test.test_support.run_unittest(ThreadTests,
+    test.test_support.run_unittest(LockTests, RLockTests, EventTests,
+                                   ConditionAsRLockTests, ConditionTests,
+                                   SemaphoreTests, BoundedSemaphoreTests,
+                                   ThreadTests,
                                    ThreadJoinOnShutdown,
                                    ThreadingExceptionTests,
                                    )
index 18c28b7ffa42e3f95194f94e8390f597682b5571..15c5f2931551c235a932f120124608855d1e6a85 100644 (file)
@@ -106,14 +106,16 @@ class _RLock(_Verbose):
 
     def __repr__(self):
         owner = self.__owner
-        return "<%s(%s, %d)>" % (
-                self.__class__.__name__,
-                owner and owner.name,
-                self.__count)
+        try:
+            owner = _active[owner].name
+        except KeyError:
+            pass
+        return "<%s owner=%r count=%d>" % (
+                self.__class__.__name__, owner, self.__count)
 
     def acquire(self, blocking=1):
-        me = current_thread()
-        if self.__owner is me:
+        me = _get_ident()
+        if self.__owner == me:
             self.__count = self.__count + 1
             if __debug__:
                 self._note("%s.acquire(%s): recursive success", self, blocking)
@@ -132,7 +134,7 @@ class _RLock(_Verbose):
     __enter__ = acquire
 
     def release(self):
-        if self.__owner is not current_thread():
+        if self.__owner != _get_ident():
             raise RuntimeError("cannot release un-acquired lock")
         self.__count = count = self.__count - 1
         if not count:
@@ -168,7 +170,7 @@ class _RLock(_Verbose):
         return (count, owner)
 
     def _is_owned(self):
-        return self.__owner is current_thread()
+        return self.__owner == _get_ident()
 
 
 def Condition(*args, **kwargs):
index 71d613bf6f9b0c92b1bfde78758cc7554f61b40e..6d421872649e3947eaaaa2c7edaa5877e6b9db06 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -24,6 +24,10 @@ Core and Builtins
 Library
 -------
 
+- Issue #7282: Fix a memory leak when an RLock was used in a thread other
+  than those started through `threading.Thread` (for example, using
+  `thread.start_new_thread()`.
+
 - Issue #7264: Fix a possible deadlock when deallocating thread-local objects
   which are part of a reference cycle.
 
@@ -71,6 +75,9 @@ Build
 Tests
 -----
 
+- Issue #7270: Add some dedicated unit tests for multi-thread synchronization
+  primitives such as Lock, RLock, Condition, Event and Semaphore.
+
 - Issue #7055: test___all__ now greedily detects all modules which have an
   __all__ attribute, rather than using a hardcoded and incomplete list.