NUMTASKS = 10
NUMTRIPS = 3
+POLL_SLEEP = 0.010 # seconds = 10 ms
_print_mutex = thread.allocate_lock()
mut.release()
thread.start_new_thread(task, ())
while not started:
- time.sleep(0.01)
+ time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig + 1)
# Allow the task to finish.
mut.release()
wr = weakref.ref(task, lambda _: done.append(None))
del task
while not done:
- time.sleep(0.01)
+ time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self):
thread.start_new_thread(task, ())
started.acquire()
while thread._count() > c:
- time.sleep(0.01)
+ time.sleep(POLL_SLEEP)
self.assertIn("Traceback", stderr.getvalue())
def setUp(self):
self.read_fd, self.write_fd = os.pipe()
- @unittest.skipIf(sys.platform.startswith('win'),
- "This test is only appropriate for POSIX-like systems.")
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
@support.reap_threads
def test_forkinthread(self):
+ running = True
+ status = "not set"
+
def thread1():
- try:
- pid = os.fork() # fork in a thread
- except RuntimeError:
- os._exit(1) # exit the child
+ nonlocal running, status
- if pid == 0: # child
+ # fork in a thread
+ pid = os.fork()
+ if pid == 0:
+ # child
try:
os.close(self.read_fd)
os.write(self.write_fd, b"OK")
finally:
os._exit(0)
- else: # parent
+ else:
+ # parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
- self.assertEqual(status, 0)
+ running = False
thread.start_new_thread(thread1, ())
self.assertEqual(os.read(self.read_fd, 2), b"OK",
"Unable to fork() in thread")
+ while running:
+ time.sleep(POLL_SLEEP)
+ self.assertEqual(status, 0)
def tearDown(self):
try: