self.started = []
self.finished = []
self._can_exit = not wait_before_exit
+ self.wait_thread = support.wait_threads_exit()
+ self.wait_thread.__enter__()
+
def task():
tid = threading.get_ident()
self.started.append(tid)
self.finished.append(tid)
while not self._can_exit:
_wait()
+
try:
for i in range(n):
start_new_thread(task, ())
def wait_for_finished(self):
while len(self.finished) < self.n:
_wait()
- # Wait a little bit longer to prevent the "threading_cleanup()
- # failed to cleanup X threads" warning. The loop above is a weak
- # synchronization. At the C level, t_bootstrap() can still be
- # running and so _thread.count() still accounts the "almost dead"
- # thead.
- for _ in range(self.n):
- _wait()
+ # Wait for threads exit
+ self.wait_thread.__exit__(None, None, None)
def do_finish(self):
self._can_exit = True
# Lock needs to be released before re-acquiring.
lock = self.locktype()
phase = []
+
def f():
lock.acquire()
phase.append(None)
lock.acquire()
phase.append(None)
- start_new_thread(f, ())
- while len(phase) == 0:
- _wait()
- _wait()
- self.assertEqual(len(phase), 1)
- lock.release()
- while len(phase) == 1:
+
+ with support.wait_threads_exit():
+ start_new_thread(f, ())
+ while len(phase) == 0:
+ _wait()
_wait()
- self.assertEqual(len(phase), 2)
+ self.assertEqual(len(phase), 1)
+ lock.release()
+ while len(phase) == 1:
+ _wait()
+ self.assertEqual(len(phase), 2)
def test_different_thread(self):
# Lock can be released from a different thread.
return decorator
+@contextlib.contextmanager
+def wait_threads_exit(timeout=60.0):
+ """
+ bpo-31234: Context manager to wait until all threads created in the with
+ statement exit.
+
+ Use _thread.count() to check if threads exited. Indirectly, wait until
+ threads exit the internal t_bootstrap() C function of the _thread module.
+
+ threading_setup() and threading_cleanup() are designed to emit a warning
+ if a test leaves running threads in the background. This context manager
+ is designed to cleanup threads started by the _thread.start_new_thread()
+ which doesn't allow to wait for thread exit, whereas thread.Thread has a
+ join() method.
+ """
+ old_count = _thread._count()
+ try:
+ yield
+ finally:
+ start_time = time.monotonic()
+ deadline = start_time + timeout
+ while True:
+ count = _thread._count()
+ if count <= old_count:
+ break
+ if time.monotonic() > deadline:
+ dt = time.monotonic() - start_time
+ msg = (f"wait_threads() failed to cleanup {count - old_count} "
+ f"threads after {dt:.1f} seconds "
+ f"(count: {count}, old count: {old_count})")
+ raise AssertionError(msg)
+ time.sleep(0.010)
+ gc_collect()
+
+
def reap_children():
"""Use this function at the end of test_main() whenever sub-processes
are started. This will help ensure that no extra children (zombies)
self.server_ready.set()
def _setUp(self):
+ self.wait_threads = support.wait_threads_exit()
+ self.wait_threads.__enter__()
+
self.server_ready = threading.Event()
self.client_ready = threading.Event()
self.done = threading.Event()
def _tearDown(self):
self.__tearDown()
self.done.wait()
+ self.wait_threads.__exit__(None, None, None)
if self.queue.qsize():
exc = self.queue.get()
self.done_mutex.release()
def test_starting_threads(self):
- # Basic test for thread creation.
- for i in range(NUMTASKS):
- self.newtask()
- verbose_print("waiting for tasks to complete...")
- self.done_mutex.acquire()
- verbose_print("all tasks done")
+ with support.wait_threads_exit():
+ # Basic test for thread creation.
+ for i in range(NUMTASKS):
+ self.newtask()
+ verbose_print("waiting for tasks to complete...")
+ self.done_mutex.acquire()
+ verbose_print("all tasks done")
def test_stack_size(self):
# Various stack size tests.
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
- for i in range(NUMTASKS):
- self.newtask()
+ with support.wait_threads_exit():
+ for i in range(NUMTASKS):
+ self.newtask()
- verbose_print("waiting for all tasks to complete")
- self.done_mutex.acquire()
- verbose_print("all tasks done")
+ verbose_print("waiting for all tasks to complete")
+ self.done_mutex.acquire()
+ verbose_print("all tasks done")
thread.stack_size(0)
mut = thread.allocate_lock()
mut.acquire()
started = []
+
def task():
started.append(None)
mut.acquire()
mut.release()
- thread.start_new_thread(task, ())
- while not started:
- time.sleep(POLL_SLEEP)
- self.assertEqual(thread._count(), orig + 1)
- # Allow the task to finish.
- mut.release()
- # The only reliable way to be sure that the thread ended from the
- # interpreter's point of view is to wait for the function object to be
- # destroyed.
- done = []
- wr = weakref.ref(task, lambda _: done.append(None))
- del task
- while not done:
- time.sleep(POLL_SLEEP)
- self.assertEqual(thread._count(), orig)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(task, ())
+ while not started:
+ time.sleep(POLL_SLEEP)
+ self.assertEqual(thread._count(), orig + 1)
+ # Allow the task to finish.
+ mut.release()
+ # The only reliable way to be sure that the thread ended from the
+ # interpreter's point of view is to wait for the function object to be
+ # destroyed.
+ done = []
+ wr = weakref.ref(task, lambda _: done.append(None))
+ del task
+ while not done:
+ time.sleep(POLL_SLEEP)
+ self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self):
# See issue #14474
except ValueError:
pass
real_write(self, *args)
- c = thread._count()
started = thread.allocate_lock()
with support.captured_output("stderr") as stderr:
real_write = stderr.write
stderr.write = mywrite
started.acquire()
- thread.start_new_thread(task, ())
- started.acquire()
- while thread._count() > c:
- time.sleep(POLL_SLEEP)
+ with support.wait_threads_exit():
+ thread.start_new_thread(task, ())
+ started.acquire()
self.assertIn("Traceback", stderr.getvalue())
class BarrierTest(BasicThreadTest):
def test_barrier(self):
- self.bar = Barrier(NUMTASKS)
- self.running = NUMTASKS
- for i in range(NUMTASKS):
- thread.start_new_thread(self.task2, (i,))
- verbose_print("waiting for tasks to end")
- self.done_mutex.acquire()
- verbose_print("tasks done")
+ with support.wait_threads_exit():
+ self.bar = Barrier(NUMTASKS)
+ self.running = NUMTASKS
+ for i in range(NUMTASKS):
+ thread.start_new_thread(self.task2, (i,))
+ verbose_print("waiting for tasks to end")
+ self.done_mutex.acquire()
+ verbose_print("tasks done")
def task2(self, ident):
for i in range(NUMTRIPS):
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
@support.reap_threads
def test_forkinthread(self):
- running = True
status = "not set"
def thread1():
- nonlocal running, status
+ nonlocal status
# fork in a thread
pid = os.fork()
# parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
- running = False
- thread.start_new_thread(thread1, ())
- self.assertEqual(os.read(self.read_fd, 2), b"OK",
- "Unable to fork() in thread")
- while running:
- time.sleep(POLL_SLEEP)
+ with support.wait_threads_exit():
+ thread.start_new_thread(thread1, ())
+ self.assertEqual(os.read(self.read_fd, 2), b"OK",
+ "Unable to fork() in thread")
self.assertEqual(status, 0)
def tearDown(self):
done.set()
done = threading.Event()
ident = []
- _thread.start_new_thread(f, ())
- done.wait()
- self.assertIsNotNone(ident[0])
+ with support.wait_threads_exit():
+ tid = _thread.start_new_thread(f, ())
+ done.wait()
+ self.assertEqual(ident[0], tid)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
mutex = threading.Lock()
mutex.acquire()
- tid = _thread.start_new_thread(f, (mutex,))
- # Wait for the thread to finish.
- mutex.acquire()
+ with support.wait_threads_exit():
+ tid = _thread.start_new_thread(f, (mutex,))
+ # Wait for the thread to finish.
+ mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
#Issue 29376
import signal
import os
import sys
-from test.support import run_unittest, import_module
-thread = import_module('_thread')
+from test import support
+thread = support.import_module('_thread')
import time
if (sys.platform[:3] == 'win'):
class ThreadSignals(unittest.TestCase):
def test_signals(self):
- # Test signal handling semantics of threads.
- # We spawn a thread, have the thread send two signals, and
- # wait for it to finish. Check that we got both signals
- # and that they were run by the main thread.
- signalled_all.acquire()
- self.spawnSignallingThread()
- signalled_all.acquire()
+ with support.wait_threads_exit():
+ # Test signal handling semantics of threads.
+ # We spawn a thread, have the thread send two signals, and
+ # wait for it to finish. Check that we got both signals
+ # and that they were run by the main thread.
+ signalled_all.acquire()
+ self.spawnSignallingThread()
+ signalled_all.acquire()
+
# the signals that we asked the kernel to send
# will come back, but we don't know when.
# (it might even be after the thread exits
# thread.
def other_thread():
rlock.acquire()
- thread.start_new_thread(other_thread, ())
- # Wait until we can't acquire it without blocking...
- while rlock.acquire(blocking=False):
- rlock.release()
- time.sleep(0.01)
- signal.alarm(1)
- t1 = time.time()
- self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
- dt = time.time() - t1
- # See rationale above in test_lock_acquire_interruption
- self.assertLess(dt, 3.0)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while rlock.acquire(blocking=False):
+ rlock.release()
+ time.sleep(0.01)
+ signal.alarm(1)
+ t1 = time.time()
+ self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
+ dt = time.time() - t1
+ # See rationale above in test_lock_acquire_interruption
+ self.assertLess(dt, 3.0)
finally:
signal.signal(signal.SIGALRM, oldalrm)
self.sig_recvd = False
def my_handler(signal, frame):
self.sig_recvd = True
+
old_handler = signal.signal(signal.SIGUSR1, my_handler)
try:
def other_thread():
# the lock acquisition. Then we'll let it run.
time.sleep(0.5)
lock.release()
- thread.start_new_thread(other_thread, ())
- # Wait until we can't acquire it without blocking...
- while lock.acquire(blocking=False):
- lock.release()
- time.sleep(0.01)
- result = lock.acquire() # Block while we receive a signal.
- self.assertTrue(self.sig_recvd)
- self.assertTrue(result)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while lock.acquire(blocking=False):
+ lock.release()
+ time.sleep(0.01)
+ result = lock.acquire() # Block while we receive a signal.
+ self.assertTrue(self.sig_recvd)
+ self.assertTrue(result)
finally:
signal.signal(signal.SIGUSR1, old_handler)
os.kill(process_pid, signal.SIGUSR1)
done.release()
- # Send the signals from the non-main thread, since the main thread
- # is the only one that can process signals.
- thread.start_new_thread(send_signals, ())
- timed_acquire()
- # Wait for thread to finish
- done.acquire()
- # This allows for some timing and scheduling imprecision
- self.assertLess(self.end - self.start, 2.0)
- self.assertGreater(self.end - self.start, 0.3)
- # If the signal is received several times before PyErr_CheckSignals()
- # is called, the handler will get called less than 40 times. Just
- # check it's been called at least once.
- self.assertGreater(self.sigs_recvd, 0)
+ with support.wait_threads_exit():
+ # Send the signals from the non-main thread, since the main thread
+ # is the only one that can process signals.
+ thread.start_new_thread(send_signals, ())
+ timed_acquire()
+ # Wait for thread to finish
+ done.acquire()
+ # This allows for some timing and scheduling imprecision
+ self.assertLess(self.end - self.start, 2.0)
+ self.assertGreater(self.end - self.start, 0.3)
+ # If the signal is received several times before PyErr_CheckSignals()
+ # is called, the handler will get called less than 40 times. Just
+ # check it's been called at least once.
+ self.assertGreater(self.sigs_recvd, 0)
finally:
signal.signal(signal.SIGUSR1, old_handler)
oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
try:
- run_unittest(ThreadSignals)
+ support.run_unittest(ThreadSignals)
finally:
registerSignals(*oldsigs)