# these are all functions _testcapi exports whose name begins with 'test_'.
from __future__ import with_statement
+import random
+import subprocess
import sys
import time
-import random
import unittest
from test import support
try:
self.assertEqual(testfunction.attribute, "test")
self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
+ def test_no_FatalError_infinite_loop(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import _testcapi;'
+ '_testcapi.crash_no_current_thread()'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (out, err) = p.communicate()
+ self.assertEqual(out, b'')
+ # This used to cause an infinite loop.
+ self.assertEqual(err,
+ b'Fatal Python error:'
+ b' PyThreadState_Get: no current thread\n')
+
@unittest.skipUnless(threading, 'Threading required for this test.')
class TestPendingCalls(unittest.TestCase):
return PyErr_NewExceptionWithDoc(name, doc, base, dict);
}
+/* Test that the fatal error from not having a current thread doesn't
+ cause an infinite loop. Run via Lib/test/test_capi.py */
+static PyObject *
+crash_no_current_thread(PyObject *self)
+{
+ Py_BEGIN_ALLOW_THREADS
+ PyErr_SetString(PyExc_SystemError, "bork bork bork");
+ Py_END_ALLOW_THREADS
+ return NULL;
+}
+
static PyMethodDef TestMethods[] = {
{"raise_exception", raise_exception, METH_VARARGS},
{"raise_memoryerror", (PyCFunction)raise_memoryerror, METH_NOARGS},
{"code_newempty", code_newempty, METH_VARARGS},
{"make_exception_with_doc", (PyCFunction)make_exception_with_doc,
METH_VARARGS | METH_KEYWORDS},
+ {"crash_no_current_thread", (PyCFunction)crash_no_current_thread, METH_NOARGS},
{NULL, NULL} /* sentinel */
};
PyObject *
PyErr_Occurred(void)
{
- PyThreadState *tstate = PyThreadState_GET();
-
- return tstate->curexc_type;
+ /* If there is no thread state, PyThreadState_GET calls
+ Py_FatalError, which calls PyErr_Occurred. To avoid the
+ resulting infinite loop, we inline PyThreadState_GET here and
+ treat no thread as no error. */
+ PyThreadState *tstate =
+ ((PyThreadState*)_Py_atomic_load_relaxed(&_PyThreadState_Current));
+
+ return tstate == NULL ? NULL : tstate->curexc_type;
}