# Exports only things specified by thread documentation;
# skipping obsolete synonyms allocate(), start_new(), exit_thread().
__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
- 'interrupt_main', 'LockType']
+ 'interrupt_main', 'LockType', 'RLock']
# A dummy value
TIMEOUT_MAX = 2**31
hex(id(self))
)
+
+class RLock(LockType):
+ """Dummy implementation of threading._RLock.
+
+ Re-entrant lock can be aquired multiple times and needs to be released
+ just as many times. This dummy implemention does not check wheter the
+ current thread actually owns the lock, but does accounting on the call
+ counts.
+ """
+ def __init__(self):
+ super().__init__()
+ self._levels = 0
+
+ def acquire(self, waitflag=None, timeout=-1):
+ """Aquire the lock, can be called multiple times in succession.
+ """
+ locked = super().acquire(waitflag, timeout)
+ if locked:
+ self._levels += 1
+ return locked
+
+ def release(self):
+ """Release needs to be called once for every call to acquire().
+ """
+ if self._levels == 0:
+ raise error
+ if self._levels == 1:
+ super().release()
+ self._levels -= 1
+
# Used to signal that interrupt_main was called in a "thread"
_interrupt = False
# True when not executing in a "thread"
self.assertIn("unlocked", repr(self.lock))
+class RLockTests(unittest.TestCase):
+ """Test dummy RLock objects."""
+
+ def setUp(self):
+ self.rlock = _thread.RLock()
+
+ def test_multiple_acquire(self):
+ self.assertIn("unlocked", repr(self.rlock))
+ self.rlock.acquire()
+ self.rlock.acquire()
+ self.assertIn("locked", repr(self.rlock))
+ self.rlock.release()
+ self.assertIn("locked", repr(self.rlock))
+ self.rlock.release()
+ self.assertIn("unlocked", repr(self.rlock))
+ self.assertRaises(RuntimeError, self.rlock.release)
+
+
class MiscTests(unittest.TestCase):
"""Miscellaneous tests."""
func = mock.Mock(side_effect=Exception)
_thread.start_new_thread(func, tuple())
self.assertTrue(mock_print_exc.called)
+
+if __name__ == '__main__':
+ unittest.main()