From: Tim Peters Date: Wed, 9 Oct 2013 02:12:58 +0000 (-0500) Subject: Issue 19158: a rare race in BoundedSemaphore could allow .release() too often. X-Git-Tag: v3.4.0a4~204 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e99bdb9694dd472fda932926d87cd6d783ec5da8;p=python Issue 19158: a rare race in BoundedSemaphore could allow .release() too often. --- e99bdb9694dd472fda932926d87cd6d783ec5da8 diff --cc Lib/test/test_threading.py index 826acbb32e,0ebeb39cbd..4d30ee4e07 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@@ -477,130 -467,34 +477,148 @@@ class ThreadTests(BaseTestCase) pid, status = os.waitpid(pid, 0) self.assertEqual(0, status) + def test_main_thread(self): + main = threading.main_thread() + self.assertEqual(main.name, 'MainThread') + self.assertEqual(main.ident, threading.current_thread().ident) + self.assertEqual(main.ident, threading.get_ident()) + + def f(): + self.assertNotEqual(threading.main_thread().ident, + threading.current_thread().ident) + th = threading.Thread(target=f) + th.start() + th.join() + + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork(self): + code = """if 1: + import os, threading + + pid = os.fork() + if pid == 0: + main = threading.main_thread() + print(main.name) + print(main.ident == threading.current_thread().ident) + print(main.ident == threading.get_ident()) + else: + os.waitpid(pid, 0) + """ + _, out, err = assert_python_ok("-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err, b"") + self.assertEqual(data, "MainThread\nTrue\nTrue\n") + + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork_from_nonmain_thread(self): + code = """if 1: + import os, threading, sys + + def f(): + pid = os.fork() + if pid == 0: + main = threading.main_thread() + print(main.name) + print(main.ident == threading.current_thread().ident) + print(main.ident == threading.get_ident()) + # stdout is fully buffered because not a tty, + # we have to flush before exit. + sys.stdout.flush() + else: + os.waitpid(pid, 0) + + th = threading.Thread(target=f) + th.start() + th.join() + """ + _, out, err = assert_python_ok("-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err, b"") + self.assertEqual(data, "Thread-1\nTrue\nTrue\n") + + def test_tstate_lock(self): + # Test an implementation detail of Thread objects. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + time.sleep(0.01) + # The tstate lock is None until the thread is started + t = threading.Thread(target=f) + self.assertIs(t._tstate_lock, None) + t.start() + started.acquire() + self.assertTrue(t.is_alive()) + # The tstate lock can't be acquired when the thread is running + # (or suspended). + tstate_lock = t._tstate_lock + self.assertFalse(tstate_lock.acquire(timeout=0), False) + finish.release() + # When the thread ends, the state_lock can be successfully + # acquired. + self.assertTrue(tstate_lock.acquire(timeout=5), False) + # But is_alive() is still True: we hold _tstate_lock now, which + # prevents is_alive() from knowing the thread's end-of-life C code + # is done. + self.assertTrue(t.is_alive()) + # Let is_alive() find out the C code is done. + tstate_lock.release() + self.assertFalse(t.is_alive()) + # And verify the thread disposed of _tstate_lock. + self.assertTrue(t._tstate_lock is None) + + def test_repr_stopped(self): + # Verify that "stopped" shows up in repr(Thread) appropriately. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + t = threading.Thread(target=f) + t.start() + started.acquire() + self.assertIn("started", repr(t)) + finish.release() + # "stopped" should appear in the repr in a reasonable amount of time. + # Implementation detail: as of this writing, that's trivially true + # if .join() is called, and almost trivially true if .is_alive() is + # called. The detail we're testing here is that "stopped" shows up + # "all on its own". + LOOKING_FOR = "stopped" + for i in range(500): + if LOOKING_FOR in repr(t): + break + time.sleep(0.01) + self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds + def test_BoundedSemaphore_limit(self): - # BoundedSemaphore should raise ValueError if released too often. - for limit in range(1, 10): - bs = threading.BoundedSemaphore(limit) - threads = [threading.Thread(target=bs.acquire) - for _ in range(limit)] - for t in threads: - t.start() - for t in threads: - t.join() - threads = [threading.Thread(target=bs.release) - for _ in range(limit)] - for t in threads: - t.start() - for t in threads: - t.join() - self.assertRaises(ValueError, bs.release) ++ # BoundedSemaphore should raise ValueError if released too often. ++ for limit in range(1, 10): ++ bs = threading.BoundedSemaphore(limit) ++ threads = [threading.Thread(target=bs.acquire) ++ for _ in range(limit)] ++ for t in threads: ++ t.start() ++ for t in threads: ++ t.join() ++ threads = [threading.Thread(target=bs.release) ++ for _ in range(limit)] ++ for t in threads: ++ t.start() ++ for t in threads: ++ t.join() ++ self.assertRaises(ValueError, bs.release) + class ThreadJoinOnShutdown(BaseTestCase): - # Between fork() and exec(), only async-safe functions are allowed (issues - # #12316 and #11870), and fork() from a worker thread is known to trigger - # problems with some operating systems (issue #3863): skip problematic tests - # on platforms known to behave badly. - platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', - 'os2emx', 'hp-ux11') - def _run_and_join(self, script): script = """if 1: import sys, os, time, threading