finally:
sys.settrace(old_trace)
+ @cpython_only
+ def test_shutdown_locks(self):
+ for daemon in (False, True):
+ with self.subTest(daemon=daemon):
+ event = threading.Event()
+ thread = threading.Thread(target=event.wait, daemon=daemon)
+
+ # Thread.start() must add lock to _shutdown_locks,
+ # but only for non-daemon thread
+ thread.start()
+ tstate_lock = thread._tstate_lock
+ if not daemon:
+ self.assertIn(tstate_lock, threading._shutdown_locks)
+ else:
+ self.assertNotIn(tstate_lock, threading._shutdown_locks)
+
+ # unblock the thread and join it
+ event.set()
+ thread.join()
+
+ # Thread._stop() must remove tstate_lock from _shutdown_locks.
+ # Daemon threads must never add it to _shutdown_locks.
+ self.assertNotIn(tstate_lock, threading._shutdown_locks)
+
class ThreadJoinOnShutdown(BaseTestCase):
self._tstate_lock = None
if not self.daemon:
with _shutdown_locks_lock:
- _shutdown_locks.discard(self._tstate_lock)
+ _shutdown_locks.discard(lock)
def _delete(self):
"Remove current thread from the dict of currently running threads."