import os, sys, time, unittest
import test.support as support
-_thread = support.import_module('_thread')
+
+threading = support.import_module('threading')
LONGSLEEP = 2
SHORTSLEEP = 0.5
class ForkWait(unittest.TestCase):
def setUp(self):
+ self._threading_key = support.threading_setup()
self.alive = {}
self.stop = 0
+ self.threads = []
+
+ def tearDown(self):
+ # Stop threads
+ self.stop = 1
+ for thread in self.threads:
+ thread.join()
+ thread = None
+ self.threads.clear()
+ support.threading_cleanup(*self._threading_key)
def f(self, id):
while not self.stop:
self.assertEqual(spid, cpid)
self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
- @support.reap_threads
def test_wait(self):
for i in range(NUM_THREADS):
- _thread.start_new(self.f, (i,))
+ thread = threading.Thread(target=self.f, args=(i,))
+ thread.start()
+ self.threads.append(thread)
# busy-loop to wait for threads
deadline = time.monotonic() + 10.0
os._exit(n)
else:
# Parent
- try:
- self.wait_impl(cpid)
- finally:
- # Tell threads to die
- self.stop = 1
+ self.wait_impl(cpid)