# And verify the thread disposed of _tstate_lock.
self.assertTrue(t._tstate_lock is None)
+ def test_repr_stopped(self):
+ # Verify that "stopped" shows up in repr(Thread) appropriately.
+ started = _thread.allocate_lock()
+ finish = _thread.allocate_lock()
+ started.acquire()
+ finish.acquire()
+ def f():
+ started.release()
+ finish.acquire()
+ t = threading.Thread(target=f)
+ t.start()
+ started.acquire()
+ self.assertIn("started", repr(t))
+ finish.release()
+ # "stopped" should appear in the repr in a reasonable amount of time.
+ # Implementation detail: as of this writing, that's trivially true
+ # if .join() is called, and almost trivially true if .is_alive() is
+ # called. The detail we're testing here is that "stopped" shows up
+ # "all on its own".
+ LOOKING_FOR = "stopped"
+ for i in range(500):
+ if LOOKING_FOR in repr(t):
+ break
+ time.sleep(0.01)
+ self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
class ThreadJoinOnShutdown(BaseTestCase):