self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
self.assertIsNone(t._tstate_lock)
+ t.join()
def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
break
time.sleep(0.01)
self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
+ t.join()
def test_BoundedSemaphore_limit(self):
# BoundedSemaphore should raise ValueError if released too often.
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, thread.start)
+ thread.join()
def test_joining_current_thread(self):
current_thread = threading.current_thread()
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
+ thread.join()
def test_releasing_unacquired_lock(self):
lock = threading.Lock()