will then be called. Returns nothing. Not on Windows. (See the Unix man page
:manpage:`signal(2)`.)
+ See also :func:`sigwait` and :func:`sigpending`.
+
+
+.. function:: pthread_kill(thread_id, signum)
+
+ Send the signal *signum* to the thread *thread_id*, another thread in the same
+ process as the caller. The signal is asynchronously directed to thread.
+
+ *thread_id* can be read from the :attr:`~threading.Thread.ident` attribute
+ of :attr:`threading.Thread`. For example,
+ ``threading.current_thread().ident`` gives the identifier of the current
+ thread.
+
+ If *signum* is 0, then no signal is sent, but error checking is still
+ performed; this can be used to check if a thread is still running.
+
+ Availability: Unix (see the man page :manpage:`pthread_kill(3)` for further
+ information).
+
+ See also :func:`os.kill`.
+
+ .. versionadded:: 3.3
+
.. function:: pthread_sigmask(how, mask)
Availability: Unix. See the man page :manpage:`sigprocmask(3)` and
:manpage:`pthread_sigmask(3)` for further information.
+ See also :func:`pause`, :func:`sigpending` and :func:`sigwait`.
+
.. versionadded:: 3.3
:const:`SIGTERM`. A :exc:`ValueError` will be raised in any other case.
+.. function:: sigpending()
+
+ Examine the set of signals that are pending for delivery to the calling
+ thread (i.e., the signals which have been raised while blocked). Return the
+ set of the pending signals.
+
+ Availability: Unix (see the man page :manpage:`sigpending(2)` for further
+ information).
+
+ See also :func:`pause`, :func:`pthread_sigmask` and :func:`sigwait`.
+
+ .. versionadded:: 3.3
+
+
+.. function:: sigwait(sigset)
+
+ Suspend execution of the calling thread until the delivery of one of the
+ signals specified in the signal set *sigset*. The function accepts the signal
+ (removes it from the pending list of signals), and returns the signal number.
+
+ Availability: Unix (see the man page :manpage:`sigwait(3)` for further
+ information).
+
+ See also :func:`pause`, :func:`pthread_sigmask` and :func:`sigpending`.
+
+ .. versionadded:: 3.3
+
+
.. _signal-example:
Example
import subprocess
import traceback
import sys, os, time, errno
+try:
+ import threading
+except ImportError:
+ threading = None
if sys.platform in ('os2', 'riscos'):
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
-class BasicSignalTests(unittest.TestCase):
+class PosixTests(unittest.TestCase):
def trivial_signal_handler(self, *args):
pass
self.assertEqual(self.hndl_called, True)
-@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
class PendingSignalsTests(unittest.TestCase):
"""
- Tests for the pthread_sigmask() function.
+ Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
+ functions.
"""
+ def setUp(self):
+ self.has_pthread_kill = hasattr(signal, 'pthread_kill')
+
def handler(self, signum, frame):
1/0
def read_sigmask(self):
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
- def test_pthread_sigmask_arguments(self):
- self.assertRaises(TypeError, signal.pthread_sigmask)
- self.assertRaises(TypeError, signal.pthread_sigmask, 1)
- self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
- self.assertRaises(RuntimeError, signal.pthread_sigmask, 1700, [])
+ def can_test_blocked_signals(self, skip):
+ """
+ Check if a blocked signal can be raised to the main thread without
+ calling its signal handler. We need pthread_kill() or exactly one
+ thread (the main thread).
- def test_pthread_sigmask(self):
- import faulthandler
- pid = os.getpid()
- signum = signal.SIGUSR1
+ Return True if it's possible. Otherwise, return False and print a
+ warning if skip is False, or raise a SkipTest exception if skip is
+ True.
+ """
+ if self.has_pthread_kill:
+ return True
# The fault handler timeout thread masks all signals. If the main
# thread masks also SIGUSR1, all threads mask this signal. In this
# case, if we send SIGUSR1 to the process, the signal is pending in the
# main or the faulthandler timeout thread. Unblock SIGUSR1 in the main
# thread calls the signal handler only if the signal is pending for the
- # main thread.
- #
- # Stop the faulthandler timeout thread to workaround this problem.
- # Another solution would be to send the signal directly to the main
- # thread using pthread_kill(), but Python doesn't expose this
- # function.
+ # main thread. Stop the faulthandler timeout thread to workaround this
+ # problem.
+ import faulthandler
faulthandler.cancel_dump_tracebacks_later()
- # Issue #11998: The _tkinter module loads the Tcl library which creates
- # a thread waiting events in select(). This thread receives signals
- # blocked by all other threads. We cannot test blocked signals if the
- # _tkinter module is loaded.
- can_test_blocked_signals = ('_tkinter' not in sys.modules)
- if not can_test_blocked_signals:
- print("WARNING: _tkinter is loaded, cannot test signals "
- "blocked by pthread_sigmask() (issue #11998)")
+ # Issue #11998: The _tkinter module loads the Tcl library which
+ # creates a thread waiting events in select(). This thread receives
+ # signals blocked by all other threads. We cannot test blocked
+ # signals
+ if '_tkinter' in sys.modules:
+ message = ("_tkinter is loaded and pthread_kill() is missing, "
+ "cannot test blocked signals (issue #11998)")
+ if skip:
+ self.skipTest(message)
+ else:
+ print("WARNING: %s" % message)
+ return False
+ return True
+
+ def kill(self, signum):
+ if self.has_pthread_kill:
+ tid = threading.current_thread().ident
+ signal.pthread_kill(tid, signum)
+ else:
+ pid = os.getpid()
+ os.kill(pid, signum)
+
+ @unittest.skipUnless(hasattr(signal, 'sigpending'),
+ 'need signal.sigpending()')
+ def test_sigpending_empty(self):
+ self.assertEqual(signal.sigpending(), set())
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ @unittest.skipUnless(hasattr(signal, 'sigpending'),
+ 'need signal.sigpending()')
+ def test_sigpending(self):
+ self.can_test_blocked_signals(True)
+
+ signum = signal.SIGUSR1
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
+
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ self.kill(signum)
+ self.assertEqual(signal.sigpending(), {signum})
+ with self.assertRaises(ZeroDivisionError):
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+ 'need signal.pthread_kill()')
+ def test_pthread_kill(self):
+ signum = signal.SIGUSR1
+ current = threading.current_thread().ident
+
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
+
+ with self.assertRaises(ZeroDivisionError):
+ signal.pthread_kill(current, signum)
+
+ @unittest.skipUnless(hasattr(signal, 'sigwait'),
+ 'need signal.sigwait()')
+ def test_sigwait(self):
+ old_handler = signal.signal(signal.SIGALRM, self.handler)
+ self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+
+ signal.alarm(1)
+ self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM)
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ def test_pthread_sigmask_arguments(self):
+ self.assertRaises(TypeError, signal.pthread_sigmask)
+ self.assertRaises(TypeError, signal.pthread_sigmask, 1)
+ self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
+ self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ def test_pthread_sigmask(self):
+ test_blocked_signals = self.can_test_blocked_signals(False)
+ signum = signal.SIGUSR1
# Install our signal handler
old_handler = signal.signal(signum, self.handler)
old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
with self.assertRaises(ZeroDivisionError):
- os.kill(pid, signum)
+ self.kill(signum)
# Block and then raise SIGUSR1. The signal is blocked: the signal
# handler is not called, and the signal is now pending
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- if can_test_blocked_signals:
- os.kill(pid, signum)
+ if test_blocked_signals:
+ self.kill(signum)
# Check the new mask
blocked = self.read_sigmask()
self.assertEqual(old_mask ^ blocked, {signum})
# Unblock SIGUSR1
- if can_test_blocked_signals:
+ if test_blocked_signals:
with self.assertRaises(ZeroDivisionError):
# unblock the pending signal calls immediatly the signal handler
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
else:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
with self.assertRaises(ZeroDivisionError):
- os.kill(pid, signum)
+ self.kill(signum)
# Check the new mask
unblocked = self.read_sigmask()
def test_main():
try:
- support.run_unittest(BasicSignalTests, InterProcessSignalTests,
+ support.run_unittest(PosixTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest,
ItimerTest, WindowsSignalTests,
PendingSignalsTests)
Returns current value of given itimer.");
#endif
-#ifdef PYPTHREAD_SIGMASK
+#if defined(PYPTHREAD_SIGMASK) || defined(HAVE_SIGWAIT)
/* Convert an iterable to a sigset.
Return 0 on success, return -1 and raise an exception on error. */
Py_XDECREF(iterator);
return result;
}
+#endif
+#if defined(PYPTHREAD_SIGMASK) || defined(HAVE_SIGPENDING)
static PyObject*
sigset_to_set(sigset_t mask)
{
}
return result;
}
+#endif
+#ifdef PYPTHREAD_SIGMASK
static PyObject *
signal_pthread_sigmask(PyObject *self, PyObject *args)
{
err = pthread_sigmask(how, &mask, &previous);
if (err != 0) {
errno = err;
- PyErr_SetFromErrno(PyExc_RuntimeError);
+ PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
#endif /* #ifdef PYPTHREAD_SIGMASK */
+#ifdef HAVE_SIGPENDING
+static PyObject *
+signal_sigpending(PyObject *self)
+{
+ int err;
+ sigset_t mask;
+ err = sigpending(&mask);
+ if (err)
+ return PyErr_SetFromErrno(PyExc_OSError);
+ return sigset_to_set(mask);
+}
+
+PyDoc_STRVAR(signal_sigpending_doc,
+"sigpending() -> list\n\
+\n\
+Examine pending signals.");
+#endif /* #ifdef HAVE_SIGPENDING */
+
+
+#ifdef HAVE_SIGWAIT
+static PyObject *
+signal_sigwait(PyObject *self, PyObject *args)
+{
+ PyObject *signals;
+ sigset_t set;
+ int err, signum;
+
+ if (!PyArg_ParseTuple(args, "O:sigwait", &signals))
+ return NULL;
+
+ if (iterable_to_sigset(signals, &set))
+ return NULL;
+
+ err = sigwait(&set, &signum);
+ if (err) {
+ errno = err;
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+
+ return PyLong_FromLong(signum);
+}
+
+PyDoc_STRVAR(signal_sigwait_doc,
+"sigwait(sigset) -> signum\n\
+\n\
+Wait a signal.");
+#endif /* #ifdef HAVE_SIGPENDING */
+
+
+#if defined(HAVE_PTHREAD_KILL) && defined(WITH_THREAD)
+static PyObject *
+signal_pthread_kill(PyObject *self, PyObject *args)
+{
+ long tid;
+ int signum;
+ int err;
+
+ if (!PyArg_ParseTuple(args, "li:pthread_kill", &tid, &signum))
+ return NULL;
+
+ err = pthread_kill(tid, signum);
+ if (err != 0) {
+ errno = err;
+ PyErr_SetFromErrno(PyExc_OSError);
+ return NULL;
+ }
+
+ /* the signal may have been send to the current thread */
+ if (PyErr_CheckSignals())
+ return NULL;
+
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(signal_pthread_kill_doc,
+"pthread_kill(thread_id, signum)\n\
+\n\
+Send a signal to a thread.");
+#endif /* #if defined(HAVE_PTHREAD_KILL) && defined(WITH_THREAD) */
+
+
+
/* List of functions defined in the module */
static PyMethodDef signal_methods[] = {
#ifdef HAVE_ALARM
#endif
{"default_int_handler", signal_default_int_handler,
METH_VARARGS, default_int_handler_doc},
+#if defined(HAVE_PTHREAD_KILL) && defined(WITH_THREAD)
+ {"pthread_kill", (PyCFunction)signal_pthread_kill,
+ METH_VARARGS, signal_pthread_kill_doc},
+#endif
#ifdef PYPTHREAD_SIGMASK
{"pthread_sigmask", (PyCFunction)signal_pthread_sigmask,
METH_VARARGS, signal_pthread_sigmask_doc},
+#endif
+#ifdef HAVE_SIGPENDING
+ {"sigpending", (PyCFunction)signal_sigpending,
+ METH_NOARGS, signal_sigpending_doc},
+#endif
+#ifdef HAVE_SIGWAIT
+ {"sigwait", (PyCFunction)signal_sigwait,
+ METH_VARARGS, signal_sigwait_doc},
#endif
{NULL, NULL} /* sentinel */
};