]> granicus.if.org Git - python/commitdiff
(merge 3.2) Issue #12469: Run wakeup and pending signal tests in a subprocess
authorVictor Stinner <victor.stinner@haypocalc.com>
Mon, 4 Jul 2011 15:49:40 +0000 (17:49 +0200)
committerVictor Stinner <victor.stinner@haypocalc.com>
Mon, 4 Jul 2011 15:49:40 +0000 (17:49 +0200)
to run the test in a fresh process with only one thread and to not change
signal handling of the parent process.

1  2 
Lib/test/test_signal.py
Misc/NEWS

index e5df000c737196b42784de8192a1283669a898a0,8df1bf0a192323df2d5234b4088466cf0b8e13c5..e23b126d52a58fdefb5e23fcb3e8c05767627d8a
@@@ -224,117 -233,80 +224,115 @@@ class WindowsSignalTests(unittest.TestC
  
  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  class WakeupSignalTests(unittest.TestCase):
-     TIMEOUT_FULL = 10
-     TIMEOUT_HALF = 5
 -    def check_wakeup(self, test_body):
 -        # use a subprocess to have only one thread and to not change signal
 -        # handling of the parent process
++    def check_wakeup(self, test_body, *signals):
++        # use a subprocess to have only one thread
+         code = """if 1:
+         import fcntl
+         import os
+         import signal
++        import struct
 +
-     def handler(self, signum, frame):
-         pass
++        signals = {!r}
  
-     def check_signum(self, *signals):
-         data = os.read(self.read, len(signals)+1)
-         raised = struct.unpack('%uB' % len(data), data)
-         # We don't care of the signal delivery order (it's not portable or
-         # reliable)
-         raised = set(raised)
-         signals = set(signals)
-         self.assertEqual(raised, signals)
+         def handler(signum, frame):
+             pass
  
-     def test_wakeup_fd_early(self):
-         import select
-         signal.alarm(1)
-         before_time = time.time()
-         # We attempt to get a signal during the sleep,
-         # before select is called
-         time.sleep(self.TIMEOUT_FULL)
-         mid_time = time.time()
-         self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
-         select.select([self.read], [], [], self.TIMEOUT_FULL)
-         after_time = time.time()
-         self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
-         self.check_signum(signal.SIGALRM)
++        def check_signum(signals):
++            data = os.read(read, len(signals)+1)
++            raised = struct.unpack('%uB' % len(data), data)
++            # We don't care of the signal delivery order (it's not portable or
++            # reliable)
++            raised = set(raised)
++            signals = set(signals)
++            assert raised == signals, "%r != %r" % (raised, signals)
 +
-     def test_wakeup_fd_during(self):
-         import select
+         {}
  
-         signal.alarm(1)
-         before_time = time.time()
-         # We attempt to get a signal during the select call
-         self.assertRaises(select.error, select.select,
-             [self.read], [], [], self.TIMEOUT_FULL)
-         after_time = time.time()
-         self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
-         self.check_signum(signal.SIGALRM)
+         signal.signal(signal.SIGALRM, handler)
+         read, write = os.pipe()
 -        flags = fcntl.fcntl(write, fcntl.F_GETFL, 0)
 -        flags = flags | os.O_NONBLOCK
 -        fcntl.fcntl(write, fcntl.F_SETFL, flags)
++        for fd in (read, write):
++            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
++            flags = flags | os.O_NONBLOCK
++            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+         signal.set_wakeup_fd(write)
  
-     def test_signum(self):
-         old_handler = signal.signal(signal.SIGUSR1, self.handler)
-         self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
-         os.kill(os.getpid(), signal.SIGUSR1)
-         os.kill(os.getpid(), signal.SIGALRM)
-         self.check_signum(signal.SIGUSR1, signal.SIGALRM)
+         test()
++        check_signum(signals)
  
-     @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
-                          'need signal.pthread_sigmask()')
-     @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
-                          'need signal.pthread_kill()')
-     def test_pending(self):
-         signum1 = signal.SIGUSR1
-         signum2 = signal.SIGUSR2
-         tid = threading.current_thread().ident
+         os.close(read)
+         os.close(write)
 -        """.format(test_body)
++        """.format(signals, test_body)
  
-         old_handler = signal.signal(signum1, self.handler)
-         self.addCleanup(signal.signal, signum1, old_handler)
-         old_handler = signal.signal(signum2, self.handler)
-         self.addCleanup(signal.signal, signum2, old_handler)
+         assert_python_ok('-c', code)
  
-         signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
-         signal.pthread_kill(tid, signum1)
-         signal.pthread_kill(tid, signum2)
-         # Unblocking the 2 signals calls the C signal handler twice
-         signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
+     def test_wakeup_fd_early(self):
+         self.check_wakeup("""def test():
+             import select
+             import time
  
-         self.check_signum(signum1, signum2)
+             TIMEOUT_FULL = 10
+             TIMEOUT_HALF = 5
  
-     @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
-                          'need signal.pthread_kill()')
-     def test_pthread_kill_main_thread(self):
-         # Test that a signal can be sent to the main thread with pthread_kill()
-         # before any other thread has been created (see issue #12392).
-         code = """if True:
-             import threading
-             import signal
-             import sys
+             signal.alarm(1)
+             before_time = time.time()
+             # We attempt to get a signal during the sleep,
+             # before select is called
+             time.sleep(TIMEOUT_FULL)
+             mid_time = time.time()
+             dt = mid_time - before_time
 -            if dt >= TIMEOUT_HALF:
 -                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
++            assert dt < TIMEOUT_HALF, dt
+             select.select([read], [], [], TIMEOUT_FULL)
+             after_time = time.time()
+             dt = after_time - mid_time
 -            if dt >= TIMEOUT_HALF:
 -                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
 -        """)
++            assert dt < TIMEOUT_HALF, dt
++        """, signal.SIGALRM)
  
-             def handler(signum, frame):
-                 sys.exit(3)
+     def test_wakeup_fd_during(self):
+         self.check_wakeup("""def test():
+             import select
+             import time
  
 -            if dt >= TIMEOUT_HALF:
 -                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
 -        """)
+             TIMEOUT_FULL = 10
+             TIMEOUT_HALF = 5
+             signal.alarm(1)
+             before_time = time.time()
+             # We attempt to get a signal during the select call
+             try:
+                 select.select([read], [], [], TIMEOUT_FULL)
+             except select.error:
+                 pass
+             else:
+                 raise Exception("select.error not raised")
+             after_time = time.time()
+             dt = after_time - before_time
-             signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
-             sys.exit(1)
-         """
++            assert dt < TIMEOUT_HALF, dt
++        """, signal.SIGALRM)
++
++    def test_signum(self):
++        self.check_wakeup("""def test():
 +            signal.signal(signal.SIGUSR1, handler)
-         with spawn_python('-c', code) as process:
-             stdout, stderr = process.communicate()
-             exitcode = process.wait()
-             if exitcode != 3:
-                 raise Exception("Child error (exit code %s): %s" %
-                                 (exitcode, stdout))
++            os.kill(os.getpid(), signal.SIGUSR1)
++            os.kill(os.getpid(), signal.SIGALRM)
++        """, signal.SIGUSR1, signal.SIGALRM)
 +
-     def setUp(self):
-         import fcntl
++    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
++                         'need signal.pthread_sigmask()')
++    def test_pending(self):
++        self.check_wakeup("""def test():
++            signum1 = signal.SIGUSR1
++            signum2 = signal.SIGUSR2
 +
-         self.alrm = signal.signal(signal.SIGALRM, self.handler)
-         self.read, self.write = os.pipe()
-         flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
-         flags = flags | os.O_NONBLOCK
-         fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
-         self.old_wakeup = signal.set_wakeup_fd(self.write)
++            signal.signal(signum1, handler)
++            signal.signal(signum2, handler)
 +
-     def tearDown(self):
-         signal.set_wakeup_fd(self.old_wakeup)
-         os.close(self.read)
-         os.close(self.write)
-         signal.signal(signal.SIGALRM, self.alrm)
++            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
++            os.kill(os.getpid(), signum1)
++            os.kill(os.getpid(), signum2)
++            # Unblocking the 2 signals calls the C signal handler twice
++            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
++        """,  signal.SIGUSR1, signal.SIGUSR2)
 +
  
  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  class SiginterruptTest(unittest.TestCase):
@@@ -513,332 -511,11 +511,352 @@@ class ItimerTest(unittest.TestCase)
          # and the handler should have been called
          self.assertEqual(self.hndl_called, True)
  
-     def setUp(self):
-         self.has_pthread_kill = hasattr(signal, 'pthread_kill')
-     def handler(self, signum, frame):
-         1/0
-     def read_sigmask(self):
-         return signal.pthread_sigmask(signal.SIG_BLOCK, [])
-     def can_test_blocked_signals(self, skip):
-         """
-         Check if a blocked signal can be raised to the main thread without
-         calling its signal handler. We need pthread_kill() or exactly one
-         thread (the main thread).
-         Return True if it's possible. Otherwise, return False and print a
-         warning if skip is False, or raise a SkipTest exception if skip is
-         True.
-         """
-         if self.has_pthread_kill:
-             return True
-         # The fault handler timeout thread masks all signals. If the main
-         # thread masks also SIGUSR1, all threads mask this signal. In this
-         # case, if we send SIGUSR1 to the process, the signal is pending in the
-         # main or the faulthandler timeout thread.  Unblock SIGUSR1 in the main
-         # thread calls the signal handler only if the signal is pending for the
-         # main thread. Stop the faulthandler timeout thread to workaround this
-         # problem.
-         import faulthandler
-         faulthandler.cancel_dump_tracebacks_later()
-         # Issue #11998: The _tkinter module loads the Tcl library which
-         # creates a thread waiting events in select(). This thread receives
-         # signals blocked by all other threads. We cannot test blocked
-         # signals
-         if '_tkinter' in sys.modules:
-             message = ("_tkinter is loaded and pthread_kill() is missing, "
-                        "cannot test blocked signals (issue #11998)")
-             if skip:
-                 self.skipTest(message)
-             else:
-                 print("WARNING: %s" % message)
-             return False
-         return True
-     def kill(self, signum):
-         if self.has_pthread_kill:
-             tid = threading.get_ident()
-             signal.pthread_kill(tid, signum)
-         else:
-             pid = os.getpid()
-             os.kill(pid, signum)
 +
 +class PendingSignalsTests(unittest.TestCase):
 +    """
 +    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
 +    functions.
 +    """
-         self.can_test_blocked_signals(True)
 +    @unittest.skipUnless(hasattr(signal, 'sigpending'),
 +                         'need signal.sigpending()')
 +    def test_sigpending_empty(self):
 +        self.assertEqual(signal.sigpending(), set())
 +
 +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
 +                         'need signal.pthread_sigmask()')
 +    @unittest.skipUnless(hasattr(signal, 'sigpending'),
 +                         'need signal.sigpending()')
 +    def test_sigpending(self):
-         signum = signal.SIGUSR1
-         old_handler = signal.signal(signum, self.handler)
-         self.addCleanup(signal.signal, signum, old_handler)
++        code = """if 1:
++            import os
++            import signal
 +
-         signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
-         self.kill(signum)
-         self.assertEqual(signal.sigpending(), {signum})
-         with self.assertRaises(ZeroDivisionError):
-             signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
++            def handler(signum, frame):
++                1/0
 +
-         signum = signal.SIGUSR1
-         current = threading.get_ident()
++            signum = signal.SIGUSR1
++            signal.signal(signum, handler)
++
++            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
++            os.kill(os.getpid(), signum)
++            pending = signal.sigpending()
++            assert pending == {signum}, '%s != {%s}' % (pending, signum)
++            try:
++                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
++            except ZeroDivisionError:
++                pass
++            else:
++                raise Exception("ZeroDivisionError not raised")
++        """
++        assert_python_ok('-c', code)
 +
 +    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
 +                         'need signal.pthread_kill()')
 +    def test_pthread_kill(self):
-         old_handler = signal.signal(signum, self.handler)
-         self.addCleanup(signal.signal, signum, old_handler)
++        code = """if 1:
++            import signal
++            import threading
++            import sys
 +
-         with self.assertRaises(ZeroDivisionError):
-             signal.pthread_kill(current, signum)
++            signum = signal.SIGUSR1
++
++            def handler(signum, frame):
++                1/0
 +
-     def wait_helper(self, test, blocked):
++            signal.signal(signum, handler)
++
++            if sys.platform == 'freebsd6':
++                # Issue #12392 and #12469: send a signal to the main thread
++                # doesn't work before the creation of the first thread on
++                # FreeBSD 6
++                def noop():
++                    pass
++                thread = threading.Thread(target=noop)
++                thread.start()
++                thread.join()
++
++            tid = threading.get_ident()
++            try:
++                signal.pthread_kill(tid, signum)
++            except ZeroDivisionError:
++                pass
++            else:
++                raise Exception("ZeroDivisionError not raised")
++        """
++        assert_python_ok('-c', code)
 +
 +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
 +                         'need signal.pthread_sigmask()')
-         code = '''
- import signal
- import sys
++    def wait_helper(self, blocked, test):
 +        """
 +        test: body of the "def test(signum):" function.
 +        blocked: number of the blocked signal
 +        """
- def handler(signum, frame):
-     1/0
++        code = '''if 1:
++        import signal
++        import sys
 +
- def test(signum):
- %s
++        def handler(signum, frame):
++            1/0
 +
- blocked = %s
- signum = signal.SIGALRM
++        %s
 +
- # child: block and wait the signal
- try:
-     signal.signal(signum, handler)
-     signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
++        blocked = %s
++        signum = signal.SIGALRM
 +
-     # Do the tests
-     test(signum)
++        # child: block and wait the signal
++        try:
++            signal.signal(signum, handler)
++            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
 +
-     # The handler must not be called on unblock
-     try:
-         signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
-     except ZeroDivisionError:
-         print("the signal handler has been called",
-               file=sys.stderr)
-         sys.exit(1)
- except BaseException as err:
-     print("error: {}".format(err), file=sys.stderr)
-     sys.stderr.flush()
-     sys.exit(1)
- ''' % (test, blocked)
++            # Do the tests
++            test(signum)
 +
-         test = '''
++            # The handler must not be called on unblock
++            try:
++                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
++            except ZeroDivisionError:
++                print("the signal handler has been called",
++                      file=sys.stderr)
++                sys.exit(1)
++        except BaseException as err:
++            print("error: {}".format(err), file=sys.stderr)
++            sys.stderr.flush()
++            sys.exit(1)
++        ''' % (test.strip(), blocked)
 +
 +        # sig*wait* must be called with the signal blocked: since the current
 +        # process might have several threads running, use a subprocess to have
 +        # a single thread.
 +        assert_python_ok('-c', code)
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigwait'),
 +                         'need signal.sigwait()')
 +    def test_sigwait(self):
-         '''
-         self.wait_helper(test, signal.SIGALRM)
++        self.wait_helper(signal.SIGALRM, '''
++        def test(signum):
 +            signal.alarm(1)
 +            received = signal.sigwait([signum])
 +            assert received == signum , 'received %s, not %s' % (received, signum)
-         test = '''
++        ''')
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
 +                         'need signal.sigwaitinfo()')
 +    def test_sigwaitinfo(self):
-         '''
-         self.wait_helper(test, signal.SIGALRM)
++        self.wait_helper(signal.SIGALRM, '''
++        def test(signum):
 +            signal.alarm(1)
 +            info = signal.sigwaitinfo([signum])
 +            assert info.si_signo == signum, "info.si_signo != %s" % signum
-         test = '''
++        ''')
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
 +                         'need signal.sigtimedwait()')
 +    def test_sigtimedwait(self):
-         '''
-         self.wait_helper(test, signal.SIGALRM)
++        self.wait_helper(signal.SIGALRM, '''
++        def test(signum):
 +            signal.alarm(1)
 +            info = signal.sigtimedwait([signum], (10, 1000))
 +            assert info.si_signo == signum, 'info.si_signo != %s' % signum
-         'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6')
++        ''')
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
 +                         'need signal.sigtimedwait()')
 +    # issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug)
 +    @unittest.skipIf(sys.platform =='freebsd6',
-         test = '''
++        "sigtimedwait() with a null timeout doens't work on FreeBSD 6")
 +    def test_sigtimedwait_poll(self):
 +        # check that polling with sigtimedwait works
-         '''
-         self.wait_helper(test, signal.SIGALRM)
++        self.wait_helper(signal.SIGALRM, '''
++        def test(signum):
 +            import os
 +            os.kill(os.getpid(), signum)
 +            info = signal.sigtimedwait([signum], (0, 0))
 +            assert info.si_signo == signum, 'info.si_signo != %s' % signum
-         test = '''
++        ''')
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
 +                         'need signal.sigtimedwait()')
 +    def test_sigtimedwait_timeout(self):
-         '''
-         self.wait_helper(test, signal.SIGALRM)
++        self.wait_helper(signal.SIGALRM, '''
++        def test(signum):
 +            received = signal.sigtimedwait([signum], (1, 0))
 +            assert received is None, "received=%r" % (received,)
-         test = '''
++        ''')
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
 +                         'need signal.sigtimedwait()')
 +    def test_sigtimedwait_negative_timeout(self):
 +        signum = signal.SIGALRM
 +        self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1))
 +        self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1))
 +        self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0))
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
 +                         'need signal.sigwaitinfo()')
 +    def test_sigwaitinfo_interrupted(self):
-         '''
-         self.wait_helper(test, signal.SIGUSR1)
++        self.wait_helper(signal.SIGUSR1, '''
++        def test(signum):
 +            import errno
 +
 +            hndl_called = True
 +            def alarm_handler(signum, frame):
 +                hndl_called = False
 +
 +            signal.signal(signal.SIGALRM, alarm_handler)
 +            signal.alarm(1)
 +            try:
 +                signal.sigwaitinfo([signal.SIGUSR1])
 +            except OSError as e:
 +                if e.errno == errno.EINTR:
 +                    assert hndl_called, "SIGALRM handler not called"
 +                else:
 +                    raise Exception("Expected EINTR to be raised by sigwaitinfo")
 +            else:
 +                raise Exception("Expected EINTR to be raised by sigwaitinfo")
-         test_blocked_signals = self.can_test_blocked_signals(False)
++        ''')
 +
 +    @unittest.skipUnless(hasattr(signal, 'sigwait'),
 +                         'need signal.sigwait()')
 +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
 +                         'need signal.pthread_sigmask()')
 +    @unittest.skipIf(threading is None, "test needs threading module")
 +    def test_sigwait_thread(self):
 +        # Check that calling sigwait() from a thread doesn't suspend the whole
 +        # process. A new interpreter is spawned to avoid problems when mixing
 +        # threads and fork(): only async-safe functions are allowed between
 +        # fork() and exec().
 +        assert_python_ok("-c", """if True:
 +            import os, threading, sys, time, signal
 +
 +            # the default handler terminates the process
 +            signum = signal.SIGUSR1
 +
 +            def kill_later():
 +                # wait until the main thread is waiting in sigwait()
 +                time.sleep(1)
 +                os.kill(os.getpid(), signum)
 +
 +            # the signal must be blocked by all the threads
 +            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
 +            killer = threading.Thread(target=kill_later)
 +            killer.start()
 +            received = signal.sigwait([signum])
 +            if received != signum:
 +                print("sigwait() received %s, not %s" % (received, signum),
 +                      file=sys.stderr)
 +                sys.exit(1)
 +            killer.join()
 +            # unblock the signal, which should have been cleared by sigwait()
 +            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
 +        """)
 +
 +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
 +                         'need signal.pthread_sigmask()')
 +    def test_pthread_sigmask_arguments(self):
 +        self.assertRaises(TypeError, signal.pthread_sigmask)
 +        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
 +        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
 +        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
 +
 +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
 +                         'need signal.pthread_sigmask()')
 +    def test_pthread_sigmask(self):
-         old_handler = signal.signal(signum, self.handler)
-         self.addCleanup(signal.signal, signum, old_handler)
++        code = """if 1:
++        import signal
++        import os; import threading
++
++        def handler(signum, frame):
++            1/0
++
++        def kill(signum):
++            os.kill(os.getpid(), signum)
++
++        def read_sigmask():
++            return signal.pthread_sigmask(signal.SIG_BLOCK, [])
++
 +        signum = signal.SIGUSR1
 +
 +        # Install our signal handler
-         self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
-         with self.assertRaises(ZeroDivisionError):
-             self.kill(signum)
++        old_handler = signal.signal(signum, handler)
 +
 +        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
 +        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
-         if test_blocked_signals:
-             self.kill(signum)
++        try:
++            kill(signum)
++        except ZeroDivisionError:
++            pass
++        else:
++            raise Exception("ZeroDivisionError not raised")
 +
 +        # Block and then raise SIGUSR1. The signal is blocked: the signal
 +        # handler is not called, and the signal is now pending
 +        signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
-         blocked = self.read_sigmask()
-         self.assertIn(signum, blocked)
-         self.assertEqual(old_mask ^ blocked, {signum})
++        kill(signum)
 +
 +        # Check the new mask
-         if test_blocked_signals:
-             with self.assertRaises(ZeroDivisionError):
-                 # unblock the pending signal calls immediatly the signal handler
-                 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
-         else:
++        blocked = read_sigmask()
++        assert signum in blocked, "%s not in %s" % (signum, blocked)
++        assert old_mask ^ blocked == {signum}, "%s ^ %s != {%s}" % (old_mask, blocked, signum)
 +
 +        # Unblock SIGUSR1
-         with self.assertRaises(ZeroDivisionError):
-             self.kill(signum)
++        try:
++            # unblock the pending signal calls immediatly the signal handler
 +            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
-         unblocked = self.read_sigmask()
-         self.assertNotIn(signum, unblocked)
-         self.assertEqual(blocked ^ unblocked, {signum})
-         self.assertSequenceEqual(old_mask, unblocked)
-         # Finally, restore the previous signal handler and the signal mask
++        except ZeroDivisionError:
++            pass
++        else:
++            raise Exception("ZeroDivisionError not raised")
++        try:
++            kill(signum)
++        except ZeroDivisionError:
++            pass
++        else:
++            raise Exception("ZeroDivisionError not raised")
 +
 +        # Check the new mask
++        unblocked = read_sigmask()
++        assert signum not in unblocked, "%s in %s" % (signum, unblocked)
++        assert blocked ^ unblocked == {signum}, "%s ^ %s != {%s}" % (blocked, unblocked, signum)
++        assert old_mask == unblocked, "%s != %s" % (old_mask, unblocked)
++        """
++        assert_python_ok('-c', code)
++
++    @unittest.skipIf(sys.platform == 'freebsd6',
++        "issue #12392: send a signal to the main thread doesn't work "
++        "before the creation of the first thread on FreeBSD 6")
++    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
++                         'need signal.pthread_kill()')
++    def test_pthread_kill_main_thread(self):
++        # Test that a signal can be sent to the main thread with pthread_kill()
++        # before any other thread has been created (see issue #12392).
++        code = """if True:
++            import threading
++            import signal
++            import sys
++
++            def handler(signum, frame):
++                sys.exit(3)
++
++            signal.signal(signal.SIGUSR1, handler)
++            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
++            sys.exit(2)
++        """
++
++        with spawn_python('-c', code) as process:
++            stdout, stderr = process.communicate()
++            exitcode = process.wait()
++            if exitcode != 3:
++                raise Exception("Child error (exit code %s): %s" %
++                                (exitcode, stdout))
 +
 +
  def test_main():
      try:
 -        support.run_unittest(BasicSignalTests, InterProcessSignalTests,
 +        support.run_unittest(PosixTests, InterProcessSignalTests,
                               WakeupSignalTests, SiginterruptTest,
 -                             ItimerTest, WindowsSignalTests)
 +                             ItimerTest, WindowsSignalTests,
 +                             PendingSignalsTests)
      finally:
          support.reap_children()
  
diff --cc Misc/NEWS
index 6678aceefe5f6c17c95bbcc60618b4f96dcff99c,eb9b636a3dd7a942e23e6d905953f9e5d42c54e2..517c611affc27df9f42a21fcc18ff6274b26c703
+++ b/Misc/NEWS
@@@ -981,46 -637,6 +981,50 @@@ Extension Module
  Tests
  -----
  
++- Issue #12469: Run wakeup and pending signal tests in a subprocess to run the
++  test in a fresh process with only one thread and to not change signal
++  handling of the parent process.
++
 +- Issue #8716: Avoid crashes caused by Aqua Tk on OSX when attempting to run
 +  test_tk or test_ttk_guionly under a username that is not currently logged
 +  in to the console windowserver (as may be the case under buildbot or ssh).
 +
 +- Issue #12407: Explicitly skip test_capi.EmbeddingTest under Windows.
 +
 +- Issue #12400: regrtest -W doesn't rerun the tests twice anymore, but captures
 +  the output and displays it on failure instead. regrtest -v doesn't print the
 +  error twice anymore if there is only one error.
 +
 +- Issue #12141: Install copies of template C module file so that
 +  test_build_ext of test_distutils and test_command_build_ext of
 +  test_packaging are no longer silently skipped when
 +  run outside of a build directory.
 +
 +- Issue #8746: Add additional tests for os.chflags() and os.lchflags().
 +  Patch by Garrett Cooper.
 +
 +- Issue #10736: Fix test_ttk test_widgets failures with Cocoa Tk 8.5.9
 +  2.8 +  on Mac OS X.  (Patch by Ronald Oussoren)
 +
 +- Issue #12057: Add tests for ISO 2022 codecs (iso2022_jp, iso2022_jp_2,
 +  iso2022_kr).
 +
 +- Issue #12180: Fixed a few remaining errors in test_packaging when no
 +  threading.
 +
 +- Issue #12120, #12119: skip a test in packaging and distutils
 +  if sys.dont_write_bytecode is set to True.
 +
 +- Issue #12096: Fix a race condition in test_threading.test_waitfor(). Patch
 +  written by Charles-François Natali.
 +
 +- Issue #11614: import __hello__ prints "Hello World!". Patch written by
 +  Andreas Stührk.
 +
 +- Issue #5723: Improve json tests to be executed with and without accelerations.
 +
 +- Issue #12041: Make test_wait3 more robust.
 +
  - Issue #11873: Change regex in test_compileall to fix occasional failures when
    when the randomly generated temporary path happened to match the regex.