]> granicus.if.org Git - python/commitdiff
Issue 19158: a rare race in BoundedSemaphore could allow .release() too often.
authorTim Peters <tim@python.org>
Wed, 9 Oct 2013 02:12:58 +0000 (21:12 -0500)
committerTim Peters <tim@python.org>
Wed, 9 Oct 2013 02:12:58 +0000 (21:12 -0500)
1  2 
Lib/test/test_threading.py
Lib/threading.py

index 826acbb32e87366046cea304be40a02c73fb37be,0ebeb39cbdc660516ddf4235187c2f0980d9513d..4d30ee4e07b2119f7040a24a70d4a22225b1da43
@@@ -477,130 -467,34 +477,148 @@@ class ThreadTests(BaseTestCase)
                  pid, status = os.waitpid(pid, 0)
                  self.assertEqual(0, status)
  
 +    def test_main_thread(self):
 +        main = threading.main_thread()
 +        self.assertEqual(main.name, 'MainThread')
 +        self.assertEqual(main.ident, threading.current_thread().ident)
 +        self.assertEqual(main.ident, threading.get_ident())
 +
 +        def f():
 +            self.assertNotEqual(threading.main_thread().ident,
 +                                threading.current_thread().ident)
 +        th = threading.Thread(target=f)
 +        th.start()
 +        th.join()
 +
 +    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
 +    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
 +    def test_main_thread_after_fork(self):
 +        code = """if 1:
 +            import os, threading
 +
 +            pid = os.fork()
 +            if pid == 0:
 +                main = threading.main_thread()
 +                print(main.name)
 +                print(main.ident == threading.current_thread().ident)
 +                print(main.ident == threading.get_ident())
 +            else:
 +                os.waitpid(pid, 0)
 +        """
 +        _, out, err = assert_python_ok("-c", code)
 +        data = out.decode().replace('\r', '')
 +        self.assertEqual(err, b"")
 +        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
 +
 +    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
 +    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
 +    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
 +    def test_main_thread_after_fork_from_nonmain_thread(self):
 +        code = """if 1:
 +            import os, threading, sys
 +
 +            def f():
 +                pid = os.fork()
 +                if pid == 0:
 +                    main = threading.main_thread()
 +                    print(main.name)
 +                    print(main.ident == threading.current_thread().ident)
 +                    print(main.ident == threading.get_ident())
 +                    # stdout is fully buffered because not a tty,
 +                    # we have to flush before exit.
 +                    sys.stdout.flush()
 +                else:
 +                    os.waitpid(pid, 0)
 +
 +            th = threading.Thread(target=f)
 +            th.start()
 +            th.join()
 +        """
 +        _, out, err = assert_python_ok("-c", code)
 +        data = out.decode().replace('\r', '')
 +        self.assertEqual(err, b"")
 +        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
 +
 +    def test_tstate_lock(self):
 +        # Test an implementation detail of Thread objects.
 +        started = _thread.allocate_lock()
 +        finish = _thread.allocate_lock()
 +        started.acquire()
 +        finish.acquire()
 +        def f():
 +            started.release()
 +            finish.acquire()
 +            time.sleep(0.01)
 +        # The tstate lock is None until the thread is started
 +        t = threading.Thread(target=f)
 +        self.assertIs(t._tstate_lock, None)
 +        t.start()
 +        started.acquire()
 +        self.assertTrue(t.is_alive())
 +        # The tstate lock can't be acquired when the thread is running
 +        # (or suspended).
 +        tstate_lock = t._tstate_lock
 +        self.assertFalse(tstate_lock.acquire(timeout=0), False)
 +        finish.release()
 +        # When the thread ends, the state_lock can be successfully
 +        # acquired.
 +        self.assertTrue(tstate_lock.acquire(timeout=5), False)
 +        # But is_alive() is still True:  we hold _tstate_lock now, which
 +        # prevents is_alive() from knowing the thread's end-of-life C code
 +        # is done.
 +        self.assertTrue(t.is_alive())
 +        # Let is_alive() find out the C code is done.
 +        tstate_lock.release()
 +        self.assertFalse(t.is_alive())
 +        # And verify the thread disposed of _tstate_lock.
 +        self.assertTrue(t._tstate_lock is None)
 +
 +    def test_repr_stopped(self):
 +        # Verify that "stopped" shows up in repr(Thread) appropriately.
 +        started = _thread.allocate_lock()
 +        finish = _thread.allocate_lock()
 +        started.acquire()
 +        finish.acquire()
 +        def f():
 +            started.release()
 +            finish.acquire()
 +        t = threading.Thread(target=f)
 +        t.start()
 +        started.acquire()
 +        self.assertIn("started", repr(t))
 +        finish.release()
 +        # "stopped" should appear in the repr in a reasonable amount of time.
 +        # Implementation detail:  as of this writing, that's trivially true
 +        # if .join() is called, and almost trivially true if .is_alive() is
 +        # called.  The detail we're testing here is that "stopped" shows up
 +        # "all on its own".
 +        LOOKING_FOR = "stopped"
 +        for i in range(500):
 +            if LOOKING_FOR in repr(t):
 +                break
 +            time.sleep(0.01)
 +        self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
  
 -        # BoundedSemaphore should raise ValueError if released too often.
 -        for limit in range(1, 10):
 -            bs = threading.BoundedSemaphore(limit)
 -            threads = [threading.Thread(target=bs.acquire)
 -                       for _ in range(limit)]
 -            for t in threads:
 -                t.start()
 -            for t in threads:
 -                t.join()
 -            threads = [threading.Thread(target=bs.release)
 -                       for _ in range(limit)]
 -            for t in threads:
 -                t.start()
 -            for t in threads:
 -                t.join()
 -            self.assertRaises(ValueError, bs.release)
+     def test_BoundedSemaphore_limit(self):
++       # BoundedSemaphore should raise ValueError if released too often.
++       for limit in range(1, 10):
++           bs = threading.BoundedSemaphore(limit)
++           threads = [threading.Thread(target=bs.acquire)
++                      for _ in range(limit)]
++           for t in threads:
++               t.start()
++           for t in threads:
++               t.join()
++           threads = [threading.Thread(target=bs.release)
++                      for _ in range(limit)]
++           for t in threads:
++               t.start()
++           for t in threads:
++               t.join()
++           self.assertRaises(ValueError, bs.release)
  class ThreadJoinOnShutdown(BaseTestCase):
  
 -    # Between fork() and exec(), only async-safe functions are allowed (issues
 -    # #12316 and #11870), and fork() from a worker thread is known to trigger
 -    # problems with some operating systems (issue #3863): skip problematic tests
 -    # on platforms known to behave badly.
 -    platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
 -                         'os2emx', 'hp-ux11')
 -
      def _run_and_join(self, script):
          script = """if 1:
              import sys, os, time, threading
Simple merge