HAVE_THREADS = True
except ImportError:
HAVE_THREADS = False
+try:
+ import _testcapi
+except ImportError:
+ _testcapi = None
TIMEOUT = 0.5
3,
'Floating point exception')
- @unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
- "need faulthandler._sigbus()")
+ @unittest.skipIf(_testcapi is None, 'need _testcapi')
+ @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
def test_sigbus(self):
self.check_fatal_error("""
+import _testcapi
import faulthandler
+import signal
+
faulthandler.enable()
-faulthandler._sigbus()
+_testcapi.raise_signal(signal.SIGBUS)
""".strip(),
- 3,
+ 6,
'Bus error')
- @unittest.skipIf(not hasattr(faulthandler, '_sigill'),
- "need faulthandler._sigill()")
+ @unittest.skipIf(_testcapi is None, 'need _testcapi')
+ @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
def test_sigill(self):
self.check_fatal_error("""
+import _testcapi
import faulthandler
+import signal
+
faulthandler.enable()
-faulthandler._sigill()
+_testcapi.raise_signal(signal.SIGILL)
""".strip(),
- 3,
+ 6,
'Illegal instruction')
def test_fatal_error(self):
import threading
except ImportError:
threading = None
+try:
+ import _testcapi
+except ImportError:
+ _testcapi = None
class HandlerBCalled(Exception):
fd = support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
+ def test_set_wakeup_fd_result(self):
+ r1, w1 = os.pipe()
+ os.close(r1)
+ self.addCleanup(os.close, w1)
+ r2, w2 = os.pipe()
+ os.close(r2)
+ self.addCleanup(os.close, w2)
+
+ signal.set_wakeup_fd(w1)
+ self.assertIs(signal.set_wakeup_fd(w2), w1)
+ self.assertIs(signal.set_wakeup_fd(-1), w2)
+ self.assertIs(signal.set_wakeup_fd(-1), -1)
+
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class WakeupSignalTests(unittest.TestCase):
+ @unittest.skipIf(_testcapi is None, 'need _testcapi')
def check_wakeup(self, test_body, *signals, ordered=True):
# use a subprocess to have only one thread
code = """if 1:
+ import _testcapi
import fcntl
import os
import signal
assert_python_ok('-c', code)
+ @unittest.skipIf(_testcapi is None, 'need _testcapi')
def test_wakeup_write_error(self):
# Issue #16105: write() errors in the C signal handler should not
# pass silently.
# Use a subprocess to have only one thread.
code = """if 1:
+ import _testcapi
import errno
import fcntl
import os
import signal
import sys
- import time
from test.support import captured_stderr
def handler(signum, frame):
signal.set_wakeup_fd(r)
try:
with captured_stderr() as err:
- signal.alarm(1)
- time.sleep(5.0)
+ _testcapi.raise_signal(signal.SIGALRM)
except ZeroDivisionError:
# An ignored exception should have been printed out on stderr
err = err.getvalue()
raise AssertionError(err)
else:
raise AssertionError("ZeroDivisionError not raised")
+
+ os.close(r)
+ os.close(w)
"""
r, w = os.pipe()
try:
def test_signum(self):
self.check_wakeup("""def test():
+ import _testcapi
signal.signal(signal.SIGUSR1, handler)
- os.kill(os.getpid(), signal.SIGUSR1)
- os.kill(os.getpid(), signal.SIGALRM)
+ _testcapi.raise_signal(signal.SIGUSR1)
+ _testcapi.raise_signal(signal.SIGALRM)
""", signal.SIGUSR1, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
signal.signal(signum2, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
- os.kill(os.getpid(), signum1)
- os.kill(os.getpid(), signum2)
+ _testcapi.raise_signal(signum1)
+ _testcapi.raise_signal(signum2)
# Unblocking the 2 signals calls the C signal handler twice
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
sys.stdout.flush()
# run the test twice
- for loop in range(2):
- # send a SIGALRM in a second (during the read)
- signal.alarm(1)
- try:
- # blocking call: read from a pipe without data
- os.read(r, 1)
- except OSError as err:
- if err.errno != errno.EINTR:
- raise
- else:
- sys.exit(2)
- sys.exit(3)
+ try:
+ for loop in range(2):
+ # send a SIGALRM in a second (during the read)
+ signal.alarm(1)
+ try:
+ # blocking call: read from a pipe without data
+ os.read(r, 1)
+ except OSError as err:
+ if err.errno != errno.EINTR:
+ raise
+ else:
+ sys.exit(2)
+ sys.exit(3)
+ finally:
+ os.close(r)
+ os.close(w)
""" % (interrupt,)
with spawn_python('-c', code) as process:
try:
#include <float.h>
#include "structmember.h"
#include "datetime.h"
+#include <signal.h>
#ifdef WITH_THREAD
#include "pythread.h"
}
#endif /* WITH_THREAD */
+static PyObject*
+test_raise_signal(PyObject* self, PyObject *args)
+{
+ int signum, err;
+
+ if (PyArg_ParseTuple(args, "i:raise_signal", &signum) < 0)
+ return NULL;
+
+ err = raise(signum);
+ if (err)
+ return PyErr_SetFromErrno(PyExc_OSError);
+
+ if (PyErr_CheckSignals() < 0)
+ return NULL;
+
+ Py_RETURN_NONE;
+}
+
static PyMethodDef TestMethods[] = {
{"raise_exception", raise_exception, METH_VARARGS},
{"docstring_with_signature_with_defaults",
(PyCFunction)test_with_docstring, METH_NOARGS,
docstring_with_signature_with_defaults},
+ {"raise_signal",
+ (PyCFunction)test_raise_signal, METH_VARARGS},
#ifdef WITH_THREAD
{"call_in_temporary_c_thread", call_in_temporary_c_thread, METH_O,
PyDoc_STR("set_error_class(error_class) -> None")},
Py_RETURN_NONE;
}
-#ifdef SIGBUS
-static PyObject *
-faulthandler_sigbus(PyObject *self, PyObject *args)
-{
- raise(SIGBUS);
- Py_RETURN_NONE;
-}
-#endif
-
-#ifdef SIGILL
-static PyObject *
-faulthandler_sigill(PyObject *self, PyObject *args)
-{
- raise(SIGILL);
- Py_RETURN_NONE;
-}
-#endif
-
static PyObject *
faulthandler_fatal_error_py(PyObject *self, PyObject *args)
{
PyDoc_STR("_sigabrt(): raise a SIGABRT signal")},
{"_sigfpe", (PyCFunction)faulthandler_sigfpe, METH_NOARGS,
PyDoc_STR("_sigfpe(): raise a SIGFPE signal")},
-#ifdef SIGBUS
- {"_sigbus", (PyCFunction)faulthandler_sigbus, METH_NOARGS,
- PyDoc_STR("_sigbus(): raise a SIGBUS signal")},
-#endif
-#ifdef SIGILL
- {"_sigill", (PyCFunction)faulthandler_sigill, METH_NOARGS,
- PyDoc_STR("_sigill(): raise a SIGILL signal")},
-#endif
{"_fatal_error", faulthandler_fatal_error_py, METH_VARARGS,
PyDoc_STR("_fatal_error(message): call Py_FatalError(message)")},
#if defined(HAVE_SIGALTSTACK) && defined(HAVE_SIGACTION)