_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
+# Lock that ensures that new workers are not created while the interpreter is
+# shutting down. Must be held while mutating _threads_queues and _shutdown.
+_global_shutdown_lock = threading.Lock()
def _python_exit():
global _shutdown
- _shutdown = True
+ with _global_shutdown_lock:
+ _shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
self._initargs = initargs
def submit(self, fn, /, *args, **kwargs):
- with self._shutdown_lock:
+ with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)