from collections import deque, UserList
from itertools import cycle, count
from test import support
+from test.script_helper import assert_python_ok
import codecs
import io # C implementation of io
encoding='quopri_codec')
self.assertRaises(TypeError, t.read)
+ def _check_create_at_shutdown(self, **kwargs):
+ # Issue #20037: creating a TextIOWrapper at shutdown
+ # shouldn't crash the interpreter.
+ iomod = self.io.__name__
+ code = """if 1:
+ import codecs
+ import {iomod} as io
+
+ # Avoid looking up codecs at shutdown
+ codecs.lookup('utf-8')
+
+ class C:
+ def __init__(self):
+ self.buf = io.BytesIO()
+ def __del__(self):
+ io.TextIOWrapper(self.buf, **{kwargs})
+ print("ok")
+ c = C()
+ """.format(iomod=iomod, kwargs=kwargs)
+ return assert_python_ok("-c", code)
+
+ def test_create_at_shutdown_without_encoding(self):
+ rc, out, err = self._check_create_at_shutdown()
+ if err:
+ # Can error out with a RuntimeError if the module state
+ # isn't found.
+ self.assertIn("RuntimeError: could not find io module state",
+ err.decode())
+ else:
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_create_at_shutdown_with_encoding(self):
+ rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+ errors='strict')
+ self.assertFalse(err)
+ self.assertEqual("ok", out.decode().strip())
+
class CTextIOWrapperTest(TextIOWrapperTest):
+ io = io
def test_initialization(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
class PyTextIOWrapperTest(TextIOWrapperTest):
- pass
+ io = pyio
class IncrementalNewlineDecoderTest(unittest.TestCase):
import struct
import sys
import tempfile
+from test.script_helper import assert_python_ok
from test.support import (captured_stdout, run_with_locale, run_unittest,
patch, requires_zlib, TestHandler, Matcher)
import textwrap
logging.setLoggerClass(logging.Logger)
self.assertEqual(logging.getLoggerClass(), logging.Logger)
+ def test_logging_at_shutdown(self):
+ # Issue #20037
+ code = """if 1:
+ import logging
+
+ class A:
+ def __del__(self):
+ try:
+ raise ValueError("some error")
+ except Exception:
+ logging.exception("exception in __del__")
+
+ a = A()"""
+ rc, out, err = assert_python_ok("-c", code)
+ err = err.decode()
+ self.assertIn("exception in __del__", err)
+ self.assertIn("ValueError: some error", err)
+
+
class LogRecordTest(BaseTest):
def test_str_rep(self):
r = logging.makeLogRecord({})
Library
-------
+- Issue #20037: Avoid crashes when opening a text file late at interpreter
+ shutdown.
+
- Issue #19967: Thanks to the PEP 442, asyncio.Future now uses a
destructor to log uncaught exceptions, instead of the dedicated
_TracebackLogger class.
}
+_PyIO_State *
+_PyIO_get_module_state(void)
+{
+ PyObject *mod = PyState_FindModule(&_PyIO_Module);
+ _PyIO_State *state;
+ if (mod == NULL || (state = IO_MOD_STATE(mod)) == NULL) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "could not find io module state "
+ "(interpreter shutdown?)");
+ return NULL;
+ }
+ return state;
+}
+
PyObject *
_PyIO_get_locale_module(_PyIO_State *state)
{
} _PyIO_State;
#define IO_MOD_STATE(mod) ((_PyIO_State *)PyModule_GetState(mod))
-#define IO_STATE IO_MOD_STATE(PyState_FindModule(&_PyIO_Module))
+#define IO_STATE() _PyIO_get_module_state()
+extern _PyIO_State *_PyIO_get_module_state(void);
extern PyObject *_PyIO_get_locale_module(_PyIO_State *);
extern PyObject *_PyIO_str_close;
static PyObject *
bufferediobase_unsupported(const char *message)
{
- PyErr_SetString(IO_STATE->unsupported_operation, message);
+ _PyIO_State *state = IO_STATE();
+ if (state != NULL)
+ PyErr_SetString(state->unsupported_operation, message);
return NULL;
}
static PyObject *
err_mode(char *action)
{
- PyErr_Format(IO_STATE->unsupported_operation,
- "File not open for %s", action);
+ _PyIO_State *state = IO_STATE();
+ if (state != NULL)
+ PyErr_Format(state->unsupported_operation,
+ "File not open for %s", action);
return NULL;
}
static PyObject *
iobase_unsupported(const char *message)
{
- PyErr_SetString(IO_STATE->unsupported_operation, message);
+ _PyIO_State *state = IO_STATE();
+ if (state != NULL)
+ PyErr_SetString(state->unsupported_operation, message);
return NULL;
}
static PyObject *
_unsupported(const char *message)
{
- PyErr_SetString(IO_STATE->unsupported_operation, message);
+ _PyIO_State *state = IO_STATE();
+ if (state != NULL)
+ PyErr_SetString(state->unsupported_operation, message);
return NULL;
}
char *errors = NULL;
char *newline = NULL;
int line_buffering = 0, write_through = 0;
- _PyIO_State *state = IO_STATE;
+ _PyIO_State *state = NULL;
PyObject *res;
int r;
if (encoding == NULL) {
/* Try os.device_encoding(fileno) */
PyObject *fileno;
+ state = IO_STATE();
+ if (state == NULL)
+ goto error;
fileno = _PyObject_CallMethodId(buffer, &PyId_fileno, NULL);
/* Ignore only AttributeError and UnsupportedOperation */
if (fileno == NULL) {