Fetch and/or change the signal mask of the calling thread. The signal mask
is the set of signals whose delivery is currently blocked for the caller.
- The old signal mask is returned.
+ Return the old signal mask as a set of signals.
The behavior of the call is dependent on the value of *how*, as follows.
* :data:`SIG_SETMASK`: The set of blocked signals is set to the *mask*
argument.
- *mask* is a list of signal numbers (e.g. [:const:`signal.SIGINT`,
- :const:`signal.SIGTERM`]).
+ *mask* is a set of signal numbers (e.g. {:const:`signal.SIGINT`,
+ :const:`signal.SIGTERM`}). Use ``range(1, signal.NSIG)`` for a full mask
+ including all signals.
For example, ``signal.pthread_sigmask(signal.SIG_BLOCK, [])`` reads the
signal mask of the calling thread.
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
-class PthreadSigmaskTests(unittest.TestCase):
- def test_arguments(self):
+class PendingSignalsTests(unittest.TestCase):
+ """
+ Tests for the pthread_sigmask() function.
+ """
+ def handler(self, signum, frame):
+ 1/0
+
+ def read_sigmask(self):
+ return signal.pthread_sigmask(signal.SIG_BLOCK, [])
+
+ def test_pthread_sigmask_arguments(self):
self.assertRaises(TypeError, signal.pthread_sigmask)
self.assertRaises(TypeError, signal.pthread_sigmask, 1)
self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
self.assertRaises(RuntimeError, signal.pthread_sigmask, 1700, [])
- def test_block_unlock(self):
+ def test_pthread_sigmask(self):
import faulthandler
pid = os.getpid()
signum = signal.SIGUSR1
- def handler(signum, frame):
- 1/0
-
- def read_sigmask():
- return signal.pthread_sigmask(signal.SIG_BLOCK, [])
-
# The fault handler timeout thread masks all signals. If the main
# thread masks also SIGUSR1, all threads mask this signal. In this
# case, if we send SIGUSR1 to the process, the signal is pending in the
"blocked by pthread_sigmask() (issue #11998)")
# Install our signal handler
- old_handler = signal.signal(signum, handler)
+ old_handler = signal.signal(signum, self.handler)
self.addCleanup(signal.signal, signum, old_handler)
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler
os.kill(pid, signum)
# Check the new mask
- blocked = read_sigmask()
+ blocked = self.read_sigmask()
self.assertIn(signum, blocked)
- self.assertEqual(set(old_mask) ^ set(blocked), {signum})
+ self.assertEqual(old_mask ^ blocked, {signum})
# Unblock SIGUSR1
if can_test_blocked_signals:
os.kill(pid, signum)
# Check the new mask
- unblocked = read_sigmask()
+ unblocked = self.read_sigmask()
self.assertNotIn(signum, unblocked)
- self.assertEqual(set(blocked) ^ set(unblocked), {signum})
+ self.assertEqual(blocked ^ unblocked, {signum})
self.assertSequenceEqual(old_mask, unblocked)
# Finally, restore the previous signal handler and the signal mask
support.run_unittest(BasicSignalTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest,
ItimerTest, WindowsSignalTests,
- PthreadSigmaskTests)
+ PendingSignalsTests)
finally:
support.reap_children()
return result;
}
-static PyObject *
-signal_pthread_sigmask(PyObject *self, PyObject *args)
+static PyObject*
+sigset_to_set(sigset_t mask)
{
- int how, sig;
- PyObject *signals, *result, *signum;
- sigset_t mask, previous;
- int err;
-
- if (!PyArg_ParseTuple(args, "iO:pthread_sigmask", &how, &signals))
- return NULL;
-
- if (iterable_to_sigset(signals, &mask))
- return NULL;
+ PyObject *signum, *result;
+ int sig;
- err = pthread_sigmask(how, &mask, &previous);
- if (err != 0) {
- errno = err;
- PyErr_SetFromErrno(PyExc_RuntimeError);
- return NULL;
- }
-
- /* if signals was unblocked, signal handlers have been called */
- if (PyErr_CheckSignals())
- return NULL;
-
- result = PyList_New(0);
+ result = PySet_New(0);
if (result == NULL)
return NULL;
for (sig = 1; sig < NSIG; sig++) {
- if (sigismember(&previous, sig) != 1)
+ if (sigismember(&mask, sig) != 1)
continue;
/* Handle the case where it is a member by adding the signal to
Py_DECREF(result);
return NULL;
}
- if (PyList_Append(result, signum) == -1) {
+ if (PySet_Add(result, signum) == -1) {
Py_DECREF(signum);
Py_DECREF(result);
return NULL;
return result;
}
+static PyObject *
+signal_pthread_sigmask(PyObject *self, PyObject *args)
+{
+ int how;
+ PyObject *signals;
+ sigset_t mask, previous;
+ int err;
+
+ if (!PyArg_ParseTuple(args, "iO:pthread_sigmask", &how, &signals))
+ return NULL;
+
+ if (iterable_to_sigset(signals, &mask))
+ return NULL;
+
+ err = pthread_sigmask(how, &mask, &previous);
+ if (err != 0) {
+ errno = err;
+ PyErr_SetFromErrno(PyExc_RuntimeError);
+ return NULL;
+ }
+
+ /* if signals was unblocked, signal handlers have been called */
+ if (PyErr_CheckSignals())
+ return NULL;
+
+ return sigset_to_set(previous);
+}
+
PyDoc_STRVAR(signal_pthread_sigmask_doc,
"pthread_sigmask(how, mask) -> old mask\n\
\n\