]> granicus.if.org Git - python/commitdiff
Merger 65032 to py3k
authorJesse Noller <jnoller@gmail.com>
Thu, 17 Jul 2008 16:49:17 +0000 (16:49 +0000)
committerJesse Noller <jnoller@gmail.com>
Thu, 17 Jul 2008 16:49:17 +0000 (16:49 +0000)
Lib/test/test_threading.py
Lib/threading.py
Python/ceval.c

index 4c8af7f8b68d00a4969bd98e71d005425b654a7c..ed6130980179358ced5446e2b90aafd2f267e517 100644 (file)
@@ -324,6 +324,82 @@ class ThreadTests(unittest.TestCase):
                                sys.getrefcount(weak_raising_cyclic_object())))
 
 
+class ThreadJoinOnShutdown(unittest.TestCase):
+
+    def _run_and_join(self, script):
+        script = """if 1:
+            import sys, os, time, threading
+
+            # a thread, which waits for the main program to terminate
+            def joiningfunc(mainthread):
+                mainthread.join()
+                print('end of thread')
+        \n""" + script
+
+        import subprocess
+        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
+        rc = p.wait()
+        self.assertEqual(p.stdout.read(), "end of main\nend of thread\n")
+        self.failIf(rc == 2, "interpreter was blocked")
+        self.failUnless(rc == 0, "Unexpected error")
+
+    def test_1_join_on_shutdown(self):
+        # The usual case: on exit, wait for a non-daemon thread
+        script = """if 1:
+            import os
+            t = threading.Thread(target=joiningfunc,
+                                 args=(threading.current_thread(),))
+            t.start()
+            time.sleep(0.1)
+            print('end of main')
+            """
+        self._run_and_join(script)
+
+
+    def test_2_join_in_forked_process(self):
+        # Like the test above, but from a forked interpreter
+        import os
+        if not hasattr(os, 'fork'):
+            return
+        script = """if 1:
+            childpid = os.fork()
+            if childpid != 0:
+                os.waitpid(childpid, 0)
+                sys.exit(0)
+
+            t = threading.Thread(target=joiningfunc,
+                                 args=(threading.current_thread(),))
+            t.start()
+            print('end of main')
+            """
+        self._run_and_join(script)
+
+    def test_3_join_in_forked_from_thread(self):
+        # Like the test above, but fork() was called from a worker thread
+        # In the forked process, the main Thread object must be marked as stopped.
+        import os
+        if not hasattr(os, 'fork'):
+            return
+        script = """if 1:
+            main_thread = threading.current_thread()
+            def worker():
+                childpid = os.fork()
+                if childpid != 0:
+                    os.waitpid(childpid, 0)
+                    sys.exit(0)
+
+                t = threading.Thread(target=joiningfunc,
+                                     args=(main_thread,))
+                print('end of main')
+                t.start()
+                t.join() # Should not block: main_thread is already stopped
+
+            w = threading.Thread(target=worker)
+            w.start()
+            """
+        self._run_and_join(script)
+
+
 class ThreadingExceptionTests(unittest.TestCase):
     # A RuntimeError should be raised if Thread.start() is called
     # multiple times.
@@ -364,7 +440,9 @@ class ThreadingExceptionTests(unittest.TestCase):
 
 def test_main():
     test.support.run_unittest(ThreadTests,
-                              ThreadingExceptionTests)
+                                   ThreadJoinOnShutdown,
+                                   ThreadingExceptionTests,
+                                   )
 
 if __name__ == "__main__":
     test_main()
index cfa9a106a6afef2d0b8d55b60e7691e73572d513..48321014c2cd4ab61f512419e22361c50b0c22a8 100644 (file)
@@ -783,6 +783,37 @@ except ImportError:
     from _threading_local import local
 
 
+def _after_fork():
+    # This function is called by Python/ceval.c:PyEval_ReInitThreads which
+    # is called from PyOS_AfterFork.  Here we cleanup threading module state
+    # that should not exist after a fork.
+
+    # Reset _active_limbo_lock, in case we forked while the lock was held
+    # by another (non-forked) thread.  http://bugs.python.org/issue874900
+    global _active_limbo_lock
+    _active_limbo_lock = _allocate_lock()
+
+    # fork() only copied the current thread; clear references to others.
+    new_active = {}
+    current = current_thread()
+    with _active_limbo_lock:
+        for ident, thread in _active.items():
+            if thread is current:
+                # There is only one active thread.
+                new_active[ident] = thread
+            else:
+                # All the others are already stopped.
+                # We don't call _Thread__stop() because it tries to acquire
+                # thread._Thread__block which could also have been held while
+                # we forked.
+                thread._Thread__stopped = True
+
+        _limbo.clear()
+        _active.clear()
+        _active.update(new_active)
+        assert len(_active) == 1
+
+
 # Self-test code
 
 def _test():
index c75caf68231a219b43710523654cfb30fa476f27..9601de57711e398c34e7d00645f9ccf5125b5bdc 100644 (file)
@@ -266,6 +266,9 @@ PyEval_ReleaseThread(PyThreadState *tstate)
 void
 PyEval_ReInitThreads(void)
 {
+       PyObject *threading, *result;
+       PyThreadState *tstate;
+
        if (!interpreter_lock)
                return;
        /*XXX Can't use PyThread_free_lock here because it does too
@@ -275,6 +278,23 @@ PyEval_ReInitThreads(void)
        interpreter_lock = PyThread_allocate_lock();
        PyThread_acquire_lock(interpreter_lock, 1);
        main_thread = PyThread_get_thread_ident();
+
+       /* Update the threading module with the new state.
+        */
+       tstate = PyThreadState_GET();
+       threading = PyMapping_GetItemString(tstate->interp->modules,
+                                           "threading");
+       if (threading == NULL) {
+               /* threading not imported */
+               PyErr_Clear();
+               return;
+       }
+       result = PyObject_CallMethod(threading, "_after_fork", NULL);
+       if (result == NULL)
+               PyErr_WriteUnraisable(threading);
+       else
+               Py_DECREF(result);
+       Py_DECREF(threading);
 }
 #endif