# so that the signature can match the signature of the C version.
def __init__(self, buffer, encoding=None, errors=None, newline=None,
line_buffering=False, write_through=False):
- if newline is not None and not isinstance(newline, str):
- raise TypeError("illegal newline type: %r" % (type(newline),))
- if newline not in (None, "", "\n", "\r", "\r\n"):
- raise ValueError("illegal newline value: %r" % (newline,))
+ self._check_newline(newline)
if encoding is None:
try:
encoding = os.device_encoding(buffer.fileno())
raise ValueError("invalid errors: %r" % errors)
self._buffer = buffer
+ self._decoded_chars = '' # buffer for text returned from decoder
+ self._decoded_chars_used = 0 # offset into _decoded_chars for read()
+ self._snapshot = None # info for reconstructing decoder state
+ self._seekable = self._telling = self.buffer.seekable()
+ self._has_read1 = hasattr(self.buffer, 'read1')
+ self._configure(encoding, errors, newline,
+ line_buffering, write_through)
+
+ def _check_newline(self, newline):
+ if newline is not None and not isinstance(newline, str):
+ raise TypeError("illegal newline type: %r" % (type(newline),))
+ if newline not in (None, "", "\n", "\r", "\r\n"):
+ raise ValueError("illegal newline value: %r" % (newline,))
+
+ def _configure(self, encoding=None, errors=None, newline=None,
+ line_buffering=False, write_through=False):
self._encoding = encoding
self._errors = errors
+ self._encoder = None
+ self._decoder = None
+ self._b2cratio = 0.0
+
self._readuniversal = not newline
self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
- self._encoder = None
- self._decoder = None
- self._decoded_chars = '' # buffer for text returned from decoder
- self._decoded_chars_used = 0 # offset into _decoded_chars for read()
- self._snapshot = None # info for reconstructing decoder state
- self._seekable = self._telling = self.buffer.seekable()
- self._has_read1 = hasattr(self.buffer, 'read1')
- self._b2cratio = 0.0
+ self._line_buffering = line_buffering
+ self._write_through = write_through
+
+ # don't write a BOM in the middle of a file
if self._seekable and self.writable():
position = self.buffer.tell()
if position != 0:
# Sometimes the encoder doesn't exist
pass
- self._configure(line_buffering, write_through)
-
- def _configure(self, line_buffering=False, write_through=False):
- self._line_buffering = line_buffering
- self._write_through = write_through
-
# self._snapshot is either None, or a tuple (dec_flags, next_input)
# where dec_flags is the second (integer) item of the decoder state
# and next_input is the chunk of input bytes that comes next after the
def buffer(self):
return self._buffer
- def reconfigure(self, *, line_buffering=None, write_through=None):
+ def reconfigure(self, *,
+ encoding=None, errors=None, newline=Ellipsis,
+ line_buffering=None, write_through=None):
"""Reconfigure the text stream with new parameters.
This also flushes the stream.
"""
+ if (self._decoder is not None
+ and (encoding is not None or errors is not None
+ or newline is not Ellipsis)):
+ raise UnsupportedOperation(
+ "It is not possible to set the encoding or newline of stream "
+ "after the first read")
+
+ if errors is None:
+ if encoding is None:
+ errors = self._errors
+ else:
+ errors = 'strict'
+ elif not isinstance(errors, str):
+ raise TypeError("invalid errors: %r" % errors)
+
+ if encoding is None:
+ encoding = self._encoding
+ else:
+ if not isinstance(encoding, str):
+ raise TypeError("invalid encoding: %r" % encoding)
+
+ if newline is Ellipsis:
+ newline = self._readnl
+ self._check_newline(newline)
+
if line_buffering is None:
line_buffering = self.line_buffering
if write_through is None:
write_through = self.write_through
+
self.flush()
- self._configure(line_buffering, write_through)
+ self._configure(encoding, errors, newline,
+ line_buffering, write_through)
def seekable(self):
if self.closed:
F.tell = lambda x: 0
t = self.TextIOWrapper(F(), encoding='utf-8')
+ def test_reconfigure_encoding_read(self):
+ # latin1 -> utf8
+ # (latin1 can decode utf-8 encoded string)
+ data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
+ raw = self.BytesIO(data)
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ self.assertEqual(txt.readline(), 'abc\xe9\n')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(encoding='utf-8')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(newline=None)
+
+ def test_reconfigure_write_fromascii(self):
+ # ascii has a specific encodefunc in the C implementation,
+ # but utf-8-sig has not. Make sure that we get rid of the
+ # cached encodefunc when we switch encoders.
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('foo\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('\xe9\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
+
+ def test_reconfigure_write(self):
+ # latin -> utf8
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ txt.write('abc\xe9\n')
+ txt.reconfigure(encoding='utf-8')
+ self.assertEqual(raw.getvalue(), b'abc\xe9\n')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
+
+ # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
+ # the file
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
+
+ def test_reconfigure_write_non_seekable(self):
+ raw = self.BytesIO()
+ raw.seekable = lambda: False
+ raw.seek = None
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+
+ # If the raw stream is not seekable, there'll be a BOM
+ self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
+
+ def test_reconfigure_defaults(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
+ txt.reconfigure(encoding=None)
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ txt.write('LF\n')
+
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+
+ txt.reconfigure(errors='ignore')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'ignore')
+ txt.write('CRLF\n')
+
+ txt.reconfigure(encoding='utf-8', newline=None)
+ self.assertEqual(txt.errors, 'strict')
+ txt.seek(0)
+ self.assertEqual(txt.read(), 'LF\nCRLF\n')
+
+ self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
+
+ def test_reconfigure_newline(self):
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline=None)
+ self.assertEqual(txt.readline(), 'CR\n')
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='')
+ self.assertEqual(txt.readline(), 'CR\r')
+ raw = self.BytesIO(b'CR\rLF\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\n')
+ self.assertEqual(txt.readline(), 'CR\rLF\n')
+ raw = self.BytesIO(b'LF\nCR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='\r')
+ self.assertEqual(txt.readline(), 'LF\nCR\r')
+ raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
+
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
+ txt.reconfigure(newline=None)
+ txt.write('linesep\n')
+ txt.reconfigure(newline='')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\n')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\r')
+ txt.write('CR\n')
+ txt.reconfigure(newline='\r\n')
+ txt.write('CRLF\n')
+ expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
+ self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
+
class MemviewBytesIO(io.BytesIO):
'''A BytesIO object whose read method returns memoryviews
static int
_io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
- const char *encoding, const char *errors,
+ const char *encoding, PyObject *errors,
const char *newline, int line_buffering,
int write_through);
{
int return_value = -1;
static const char * const _keywords[] = {"buffer", "encoding", "errors", "newline", "line_buffering", "write_through", NULL};
- static _PyArg_Parser _parser = {"O|zzzii:TextIOWrapper", _keywords, 0};
+ static _PyArg_Parser _parser = {"O|zOzii:TextIOWrapper", _keywords, 0};
PyObject *buffer;
const char *encoding = NULL;
- const char *errors = NULL;
+ PyObject *errors = Py_None;
const char *newline = NULL;
int line_buffering = 0;
int write_through = 0;
}
PyDoc_STRVAR(_io_TextIOWrapper_reconfigure__doc__,
-"reconfigure($self, /, *, line_buffering=None, write_through=None)\n"
+"reconfigure($self, /, *, encoding=None, errors=None, newline=None,\n"
+" line_buffering=None, write_through=None)\n"
"--\n"
"\n"
"Reconfigure the text stream with new parameters.\n"
{"reconfigure", (PyCFunction)_io_TextIOWrapper_reconfigure, METH_FASTCALL|METH_KEYWORDS, _io_TextIOWrapper_reconfigure__doc__},
static PyObject *
-_io_TextIOWrapper_reconfigure_impl(textio *self,
+_io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,
+ PyObject *errors, PyObject *newline_obj,
PyObject *line_buffering_obj,
PyObject *write_through_obj);
_io_TextIOWrapper_reconfigure(textio *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
- static const char * const _keywords[] = {"line_buffering", "write_through", NULL};
- static _PyArg_Parser _parser = {"|$OO:reconfigure", _keywords, 0};
+ static const char * const _keywords[] = {"encoding", "errors", "newline", "line_buffering", "write_through", NULL};
+ static _PyArg_Parser _parser = {"|$OOOOO:reconfigure", _keywords, 0};
+ PyObject *encoding = Py_None;
+ PyObject *errors = Py_None;
+ PyObject *newline_obj = NULL;
PyObject *line_buffering_obj = Py_None;
PyObject *write_through_obj = Py_None;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
- &line_buffering_obj, &write_through_obj)) {
+ &encoding, &errors, &newline_obj, &line_buffering_obj, &write_through_obj)) {
goto exit;
}
- return_value = _io_TextIOWrapper_reconfigure_impl(self, line_buffering_obj, write_through_obj);
+ return_value = _io_TextIOWrapper_reconfigure_impl(self, encoding, errors, newline_obj, line_buffering_obj, write_through_obj);
exit:
return return_value;
{
return _io_TextIOWrapper_close_impl(self);
}
-/*[clinic end generated code: output=679b3ac5284df4e0 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=b5be870b0039d577 input=a9049054013a1b77]*/
_Py_IDENTIFIER(seek);
_Py_IDENTIFIER(seekable);
_Py_IDENTIFIER(setstate);
+_Py_IDENTIFIER(strict);
_Py_IDENTIFIER(tell);
_Py_IDENTIFIER(writable);
Py_INCREF(decoder);
if (errors == NULL) {
- self->errors = PyUnicode_FromString("strict");
+ self->errors = _PyUnicode_FromId(&PyId_strict);
if (self->errors == NULL)
return -1;
}
else {
- Py_INCREF(errors);
self->errors = errors;
}
+ Py_INCREF(self->errors);
self->translate = translate;
self->seennl = 0;
PyObject *decoder;
PyObject *readnl;
PyObject *errors;
- const char *writenl; /* utf-8 encoded, NULL stands for \n */
+ const char *writenl; /* ASCII-encoded; NULL stands for \n */
char line_buffering;
char write_through;
char readuniversal;
static PyObject *
ascii_encode(textio *self, PyObject *text)
{
- return _PyUnicode_AsASCIIString(text, PyBytes_AS_STRING(self->errors));
+ return _PyUnicode_AsASCIIString(text, PyUnicode_AsUTF8(self->errors));
}
static PyObject *
utf16be_encode(textio *self, PyObject *text)
{
return _PyUnicode_EncodeUTF16(text,
- PyBytes_AS_STRING(self->errors), 1);
+ PyUnicode_AsUTF8(self->errors), 1);
}
static PyObject *
utf16le_encode(textio *self, PyObject *text)
{
return _PyUnicode_EncodeUTF16(text,
- PyBytes_AS_STRING(self->errors), -1);
+ PyUnicode_AsUTF8(self->errors), -1);
}
static PyObject *
#endif
}
return _PyUnicode_EncodeUTF16(text,
- PyBytes_AS_STRING(self->errors), 0);
+ PyUnicode_AsUTF8(self->errors), 0);
}
static PyObject *
utf32be_encode(textio *self, PyObject *text)
{
return _PyUnicode_EncodeUTF32(text,
- PyBytes_AS_STRING(self->errors), 1);
+ PyUnicode_AsUTF8(self->errors), 1);
}
static PyObject *
utf32le_encode(textio *self, PyObject *text)
{
return _PyUnicode_EncodeUTF32(text,
- PyBytes_AS_STRING(self->errors), -1);
+ PyUnicode_AsUTF8(self->errors), -1);
}
static PyObject *
#endif
}
return _PyUnicode_EncodeUTF32(text,
- PyBytes_AS_STRING(self->errors), 0);
+ PyUnicode_AsUTF8(self->errors), 0);
}
static PyObject *
utf8_encode(textio *self, PyObject *text)
{
- return _PyUnicode_AsUTF8String(text, PyBytes_AS_STRING(self->errors));
+ return _PyUnicode_AsUTF8String(text, PyUnicode_AsUTF8(self->errors));
}
static PyObject *
latin1_encode(textio *self, PyObject *text)
{
- return _PyUnicode_AsLatin1String(text, PyBytes_AS_STRING(self->errors));
+ return _PyUnicode_AsLatin1String(text, PyUnicode_AsUTF8(self->errors));
}
/* Map normalized encoding names onto the specialized encoding funcs */
{NULL, NULL}
};
+static int
+validate_newline(const char *newline)
+{
+ if (newline && newline[0] != '\0'
+ && !(newline[0] == '\n' && newline[1] == '\0')
+ && !(newline[0] == '\r' && newline[1] == '\0')
+ && !(newline[0] == '\r' && newline[1] == '\n' && newline[2] == '\0')) {
+ PyErr_Format(PyExc_ValueError,
+ "illegal newline value: %s", newline);
+ return -1;
+ }
+ return 0;
+}
+
+static int
+set_newline(textio *self, const char *newline)
+{
+ PyObject *old = self->readnl;
+ if (newline == NULL) {
+ self->readnl = NULL;
+ }
+ else {
+ self->readnl = PyUnicode_FromString(newline);
+ if (self->readnl == NULL) {
+ self->readnl = old;
+ return -1;
+ }
+ }
+ self->readuniversal = (newline == NULL || newline[0] == '\0');
+ self->readtranslate = (newline == NULL);
+ self->writetranslate = (newline == NULL || newline[0] != '\0');
+ if (!self->readuniversal && self->readnl != NULL) {
+ // validate_newline() accepts only ASCII newlines.
+ assert(PyUnicode_KIND(self->readnl) == PyUnicode_1BYTE_KIND);
+ self->writenl = (const char *)PyUnicode_1BYTE_DATA(self->readnl);
+ if (strcmp(self->writenl, "\n") == 0) {
+ self->writenl = NULL;
+ }
+ }
+ else {
+#ifdef MS_WINDOWS
+ self->writenl = "\r\n";
+#else
+ self->writenl = NULL;
+#endif
+ }
+ Py_XDECREF(old);
+ return 0;
+}
+
+static int
+_textiowrapper_set_decoder(textio *self, PyObject *codec_info,
+ const char *errors)
+{
+ PyObject *res;
+ int r;
+
+ res = _PyObject_CallMethodId(self->buffer, &PyId_readable, NULL);
+ if (res == NULL)
+ return -1;
+
+ r = PyObject_IsTrue(res);
+ Py_DECREF(res);
+ if (r == -1)
+ return -1;
+
+ if (r != 1)
+ return 0;
+
+ Py_CLEAR(self->decoder);
+ self->decoder = _PyCodecInfo_GetIncrementalDecoder(codec_info, errors);
+ if (self->decoder == NULL)
+ return -1;
+
+ if (self->readuniversal) {
+ PyObject *incrementalDecoder = PyObject_CallFunction(
+ (PyObject *)&PyIncrementalNewlineDecoder_Type,
+ "Oi", self->decoder, (int)self->readtranslate);
+ if (incrementalDecoder == NULL)
+ return -1;
+ Py_CLEAR(self->decoder);
+ self->decoder = incrementalDecoder;
+ }
+
+ return 0;
+}
+
+static PyObject*
+_textiowrapper_decode(PyObject *decoder, PyObject *bytes, int eof)
+{
+ PyObject *chars;
+
+ if (Py_TYPE(decoder) == &PyIncrementalNewlineDecoder_Type)
+ chars = _PyIncrementalNewlineDecoder_decode(decoder, bytes, eof);
+ else
+ chars = PyObject_CallMethodObjArgs(decoder, _PyIO_str_decode, bytes,
+ eof ? Py_True : Py_False, NULL);
+
+ if (check_decoded(chars) < 0)
+ // check_decoded already decreases refcount
+ return NULL;
+
+ return chars;
+}
+
+static int
+_textiowrapper_set_encoder(textio *self, PyObject *codec_info,
+ const char *errors)
+{
+ PyObject *res;
+ int r;
+
+ res = _PyObject_CallMethodId(self->buffer, &PyId_writable, NULL);
+ if (res == NULL)
+ return -1;
+
+ r = PyObject_IsTrue(res);
+ Py_DECREF(res);
+ if (r == -1)
+ return -1;
+
+ if (r != 1)
+ return 0;
+
+ Py_CLEAR(self->encoder);
+ self->encodefunc = NULL;
+ self->encoder = _PyCodecInfo_GetIncrementalEncoder(codec_info, errors);
+ if (self->encoder == NULL)
+ return -1;
+
+ /* Get the normalized named of the codec */
+ res = _PyObject_GetAttrId(codec_info, &PyId_name);
+ if (res == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_AttributeError))
+ PyErr_Clear();
+ else
+ return -1;
+ }
+ else if (PyUnicode_Check(res)) {
+ const encodefuncentry *e = encodefuncs;
+ while (e->name != NULL) {
+ if (_PyUnicode_EqualToASCIIString(res, e->name)) {
+ self->encodefunc = e->encodefunc;
+ break;
+ }
+ e++;
+ }
+ }
+ Py_XDECREF(res);
+
+ return 0;
+}
+
+static int
+_textiowrapper_fix_encoder_state(textio *self)
+{
+ if (!self->seekable || !self->encoder) {
+ return 0;
+ }
+
+ self->encoding_start_of_stream = 1;
+
+ PyObject *cookieObj = PyObject_CallMethodObjArgs(
+ self->buffer, _PyIO_str_tell, NULL);
+ if (cookieObj == NULL) {
+ return -1;
+ }
+
+ int cmp = PyObject_RichCompareBool(cookieObj, _PyLong_Zero, Py_EQ);
+ Py_DECREF(cookieObj);
+ if (cmp < 0) {
+ return -1;
+ }
+
+ if (cmp == 0) {
+ self->encoding_start_of_stream = 0;
+ PyObject *res = PyObject_CallMethodObjArgs(
+ self->encoder, _PyIO_str_setstate, _PyLong_Zero, NULL);
+ if (res == NULL) {
+ return -1;
+ }
+ Py_DECREF(res);
+ }
+
+ return 0;
+}
/*[clinic input]
_io.TextIOWrapper.__init__
buffer: object
encoding: str(accept={str, NoneType}) = NULL
- errors: str(accept={str, NoneType}) = NULL
+ errors: object = None
newline: str(accept={str, NoneType}) = NULL
line_buffering: bool(accept={int}) = False
write_through: bool(accept={int}) = False
static int
_io_TextIOWrapper___init___impl(textio *self, PyObject *buffer,
- const char *encoding, const char *errors,
+ const char *encoding, PyObject *errors,
const char *newline, int line_buffering,
int write_through)
-/*[clinic end generated code: output=56a83402ce2a8381 input=598d10cc5f2ed7dd]*/
+/*[clinic end generated code: output=72267c0c01032ed2 input=1c5dd5d78bfcc675]*/
{
PyObject *raw, *codec_info = NULL;
_PyIO_State *state = NULL;
self->ok = 0;
self->detached = 0;
- if (newline && newline[0] != '\0'
- && !(newline[0] == '\n' && newline[1] == '\0')
- && !(newline[0] == '\r' && newline[1] == '\0')
- && !(newline[0] == '\r' && newline[1] == '\n' && newline[2] == '\0')) {
- PyErr_Format(PyExc_ValueError,
- "illegal newline value: %s", newline);
+ if (errors == Py_None) {
+ errors = _PyUnicode_FromId(&PyId_strict); /* borrowed */
+ }
+ else if (!PyUnicode_Check(errors)) {
+ // Check 'errors' argument here because Argument Clinic doesn't support
+ // 'str(accept={str, NoneType})' converter.
+ PyErr_Format(
+ PyExc_TypeError,
+ "TextIOWrapper() argument 'errors' must be str or None, not %.50s",
+ errors->ob_type->tp_name);
+ return -1;
+ }
+
+ if (validate_newline(newline) < 0) {
return -1;
}
* of the partially constructed object (like self->encoding)
*/
- if (errors == NULL)
- errors = "strict";
- self->errors = PyBytes_FromString(errors);
- if (self->errors == NULL)
- goto error;
-
+ Py_INCREF(errors);
+ self->errors = errors;
self->chunk_size = 8192;
- self->readuniversal = (newline == NULL || newline[0] == '\0');
self->line_buffering = line_buffering;
self->write_through = write_through;
- self->readtranslate = (newline == NULL);
- if (newline) {
- self->readnl = PyUnicode_FromString(newline);
- if (self->readnl == NULL)
- goto error;
- }
- self->writetranslate = (newline == NULL || newline[0] != '\0');
- if (!self->readuniversal && self->readnl) {
- self->writenl = PyUnicode_AsUTF8(self->readnl);
- if (self->writenl == NULL)
- goto error;
- if (!strcmp(self->writenl, "\n"))
- self->writenl = NULL;
+ if (set_newline(self, newline) < 0) {
+ goto error;
}
-#ifdef MS_WINDOWS
- else
- self->writenl = "\r\n";
-#endif
+
+ self->buffer = buffer;
+ Py_INCREF(buffer);
/* Build the decoder object */
- res = _PyObject_CallMethodId(buffer, &PyId_readable, NULL);
- if (res == NULL)
+ if (_textiowrapper_set_decoder(self, codec_info, PyUnicode_AsUTF8(errors)) != 0)
goto error;
- r = PyObject_IsTrue(res);
- Py_DECREF(res);
- if (r == -1)
- goto error;
- if (r == 1) {
- self->decoder = _PyCodecInfo_GetIncrementalDecoder(codec_info,
- errors);
- if (self->decoder == NULL)
- goto error;
-
- if (self->readuniversal) {
- PyObject *incrementalDecoder = PyObject_CallFunction(
- (PyObject *)&PyIncrementalNewlineDecoder_Type,
- "Oi", self->decoder, (int)self->readtranslate);
- if (incrementalDecoder == NULL)
- goto error;
- Py_XSETREF(self->decoder, incrementalDecoder);
- }
- }
/* Build the encoder object */
- res = _PyObject_CallMethodId(buffer, &PyId_writable, NULL);
- if (res == NULL)
- goto error;
- r = PyObject_IsTrue(res);
- Py_DECREF(res);
- if (r == -1)
+ if (_textiowrapper_set_encoder(self, codec_info, PyUnicode_AsUTF8(errors)) != 0)
goto error;
- if (r == 1) {
- self->encoder = _PyCodecInfo_GetIncrementalEncoder(codec_info,
- errors);
- if (self->encoder == NULL)
- goto error;
- /* Get the normalized name of the codec */
- res = _PyObject_GetAttrId(codec_info, &PyId_name);
- if (res == NULL) {
- if (PyErr_ExceptionMatches(PyExc_AttributeError))
- PyErr_Clear();
- else
- goto error;
- }
- else if (PyUnicode_Check(res)) {
- const encodefuncentry *e = encodefuncs;
- while (e->name != NULL) {
- if (_PyUnicode_EqualToASCIIString(res, e->name)) {
- self->encodefunc = e->encodefunc;
- break;
- }
- e++;
- }
- }
- Py_XDECREF(res);
- }
/* Finished sorting out the codec details */
Py_CLEAR(codec_info);
- self->buffer = buffer;
- Py_INCREF(buffer);
-
if (Py_TYPE(buffer) == &PyBufferedReader_Type ||
Py_TYPE(buffer) == &PyBufferedWriter_Type ||
Py_TYPE(buffer) == &PyBufferedRandom_Type) {
self->has_read1 = _PyObject_HasAttrId(buffer, &PyId_read1);
self->encoding_start_of_stream = 0;
- if (self->seekable && self->encoder) {
- PyObject *cookieObj;
- int cmp;
-
- self->encoding_start_of_stream = 1;
-
- cookieObj = PyObject_CallMethodObjArgs(buffer, _PyIO_str_tell, NULL);
- if (cookieObj == NULL)
- goto error;
-
- cmp = PyObject_RichCompareBool(cookieObj, _PyLong_Zero, Py_EQ);
- Py_DECREF(cookieObj);
- if (cmp < 0) {
- goto error;
- }
-
- if (cmp == 0) {
- self->encoding_start_of_stream = 0;
- res = PyObject_CallMethodObjArgs(self->encoder, _PyIO_str_setstate,
- _PyLong_Zero, NULL);
- if (res == NULL)
- goto error;
- Py_DECREF(res);
- }
+ if (_textiowrapper_fix_encoder_state(self) < 0) {
+ goto error;
}
self->ok = 1;
return v != 0;
}
+static int
+textiowrapper_change_encoding(textio *self, PyObject *encoding,
+ PyObject *errors, int newline_changed)
+{
+ /* Use existing settings where new settings are not specified */
+ if (encoding == Py_None && errors == Py_None && !newline_changed) {
+ return 0; // no change
+ }
+
+ if (encoding == Py_None) {
+ encoding = self->encoding;
+ if (errors == Py_None) {
+ errors = self->errors;
+ }
+ }
+ else if (errors == Py_None) {
+ errors = _PyUnicode_FromId(&PyId_strict);
+ }
+
+ const char *c_errors = PyUnicode_AsUTF8(errors);
+ if (c_errors == NULL) {
+ return -1;
+ }
+
+ // Create new encoder & decoder
+ PyObject *codec_info = _PyCodec_LookupTextEncoding(
+ PyUnicode_AsUTF8(encoding), "codecs.open()");
+ if (codec_info == NULL) {
+ return -1;
+ }
+ if (_textiowrapper_set_decoder(self, codec_info, c_errors) != 0 ||
+ _textiowrapper_set_encoder(self, codec_info, c_errors) != 0) {
+ Py_DECREF(codec_info);
+ return -1;
+ }
+ Py_DECREF(codec_info);
+
+ Py_INCREF(encoding);
+ Py_INCREF(errors);
+ Py_SETREF(self->encoding, encoding);
+ Py_SETREF(self->errors, errors);
+
+ return _textiowrapper_fix_encoder_state(self);
+}
/*[clinic input]
_io.TextIOWrapper.reconfigure
*
+ encoding: object = None
+ errors: object = None
+ newline as newline_obj: object(c_default="NULL") = None
line_buffering as line_buffering_obj: object = None
write_through as write_through_obj: object = None
[clinic start generated code]*/
static PyObject *
-_io_TextIOWrapper_reconfigure_impl(textio *self,
+_io_TextIOWrapper_reconfigure_impl(textio *self, PyObject *encoding,
+ PyObject *errors, PyObject *newline_obj,
PyObject *line_buffering_obj,
PyObject *write_through_obj)
-/*[clinic end generated code: output=7cdf79e7001e2856 input=baade27ecb9db7bc]*/
+/*[clinic end generated code: output=52b812ff4b3d4b0f input=671e82136e0f5822]*/
{
int line_buffering;
int write_through;
- PyObject *res;
+ const char *newline = NULL;
+
+ /* Check if something is in the read buffer */
+ if (self->decoded_chars != NULL) {
+ if (encoding != Py_None || errors != Py_None || newline_obj != NULL) {
+ _unsupported("It is not possible to set the encoding or newline"
+ "of stream after the first read");
+ return NULL;
+ }
+ }
+
+ if (newline_obj != NULL && newline_obj != Py_None) {
+ newline = PyUnicode_AsUTF8(newline_obj);
+ if (newline == NULL || validate_newline(newline) < 0) {
+ return NULL;
+ }
+ }
line_buffering = convert_optional_bool(line_buffering_obj,
self->line_buffering);
if (line_buffering < 0 || write_through < 0) {
return NULL;
}
- res = PyObject_CallMethodObjArgs((PyObject *) self, _PyIO_str_flush, NULL);
- Py_XDECREF(res);
+
+ PyObject *res = PyObject_CallMethodObjArgs((PyObject *)self, _PyIO_str_flush, NULL);
if (res == NULL) {
return NULL;
}
+ Py_DECREF(res);
+ self->b2cratio = 0;
+
+ if (newline_obj != NULL && set_newline(self, newline) < 0) {
+ return NULL;
+ }
+
+ if (textiowrapper_change_encoding(
+ self, encoding, errors, newline_obj != NULL) < 0) {
+ return NULL;
+ }
+
self->line_buffering = line_buffering;
self->write_through = write_through;
Py_RETURN_NONE;
nbytes = input_chunk_buf.len;
eof = (nbytes == 0);
- if (Py_TYPE(self->decoder) == &PyIncrementalNewlineDecoder_Type) {
- decoded_chars = _PyIncrementalNewlineDecoder_decode(
- self->decoder, input_chunk, eof);
- }
- else {
- decoded_chars = PyObject_CallMethodObjArgs(self->decoder,
- _PyIO_str_decode, input_chunk, eof ? Py_True : Py_False, NULL);
- }
- PyBuffer_Release(&input_chunk_buf);
- if (check_decoded(decoded_chars) < 0)
+ decoded_chars = _textiowrapper_decode(self->decoder, input_chunk, eof);
+ PyBuffer_Release(&input_chunk_buf);
+ if (decoded_chars == NULL)
goto fail;
+
textiowrapper_set_decoded_chars(self, decoded_chars);
nchars = PyUnicode_GET_LENGTH(decoded_chars);
if (nchars > 0)
textiowrapper_errors_get(textio *self, void *context)
{
CHECK_INITIALIZED(self);
- return PyUnicode_FromString(PyBytes_AS_STRING(self->errors));
+ Py_INCREF(self->errors);
+ return self->errors;
}
static PyObject *