self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
+ self._encoder = None
self._decoder = None
self._pending = ""
self._snapshot = None
haslf = (self._writetranslate or self._line_buffering) and "\n" in s
if haslf and self._writetranslate and self._writenl != "\n":
s = s.replace("\n", self._writenl)
+ encoder = self._encoder or self._get_encoder()
# XXX What if we were just reading?
- b = s.encode(self._encoding, self._errors)
+ b = encoder.encode(s)
self.buffer.write(b)
if self._line_buffering and (haslf or "\r" in s):
self.flush()
self._decoder.reset()
return length
+ def _get_encoder(self):
+ make_encoder = codecs.getincrementalencoder(self._encoding)
+ self._encoder = make_encoder(self._errors)
+ return self._encoder
+
def _get_decoder(self):
make_decoder = codecs.getincrementaldecoder(self._encoding)
- if make_decoder is None:
- raise IOError("Can't find an incremental decoder for encoding %s" %
- self._encoding)
decoder = make_decoder(self._errors)
if self._readuniversal:
decoder = IncrementalNewlineDecoder(decoder, self._readtranslate)
f.readline()
f.tell()
+ def testEncodedWrites(self):
+ data = "1234567890"
+ tests = ("utf-16",
+ "utf-16-le",
+ "utf-16-be",
+ "utf-32",
+ "utf-32-le",
+ "utf-32-be")
+ for encoding in tests:
+ buf = io.BytesIO()
+ f = io.TextIOWrapper(buf, encoding=encoding)
+ # Check if the BOM is written only once (see issue1753).
+ f.write(data)
+ f.write(data)
+ f.seek(0)
+ self.assertEquals(f.read(), data * 2)
+ self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
+
def timingTest(self):
timer = time.time
enc = "utf8"