"""New I/O library conforming to PEP 3116.
-This is an early prototype; eventually some of this will be
-reimplemented in C and the rest may be turned into a package.
+This is a prototype; hopefully eventually some of this will be
+reimplemented in C.
Conformance of alternative implementations: all arguments are intended
to be positional-only except the arguments of the open() function.
names like __iter__). Only the top-level names listed in the __all__
variable are part of the specification.
+XXX edge cases when switching between reading/writing
XXX need to default buffer size to 1 if isatty()
XXX need to support 1 meaning line-buffered
XXX don't use assert to validate input requirements
Character and line based layer over a BufferedIOBase object.
"""
- _CHUNK_SIZE = 64
+ _CHUNK_SIZE = 128
def __init__(self, buffer, encoding=None, newline=None):
if newline not in (None, "\n", "\r\n"):
self._decoder_in_rest_pickle = None
self._pending = ""
self._snapshot = None
- self._seekable = self.buffer.seekable()
+ self._seekable = self._telling = self.buffer.seekable()
# A word about _snapshot. This attribute is either None, or a
# tuple (decoder_pickle, readahead, pending) where decoder_pickle
def flush(self):
self.buffer.flush()
+ self._telling = self._seekable
def close(self):
self.flush()
def _read_chunk(self):
assert self._decoder is not None
- if not self._seekable:
+ if not self._telling:
readahead = self.buffer.read(self._CHUNK_SIZE)
pending = self._decoder.decode(readahead, not readahead)
return readahead, pending
def tell(self):
if not self._seekable:
raise IOError("Underlying stream is not seekable")
+ if not self._telling:
+ raise IOError("Telling position disabled by next() call")
self.flush()
position = self.buffer.tell()
if self._decoder is None or self._snapshot is None:
(whence,))
if pos < 0:
raise ValueError("Negative seek position %r" % (pos,))
+ self.flush()
orig_pos = pos
ds, pos = self._decode_decoder_state(pos)
if not ds:
self._pending = res[n:]
return res[:n]
+ def next(self) -> str:
+ self._telling = False
+ line = self.readline()
+ if not line:
+ self._snapshot = None
+ self._telling = self._seekable
+ raise StopIteration
+ return line
+
def readline(self, limit=None):
if limit is not None:
# XXX Hack to support limit argument, for backwards compatibility
"""Unit tests for io.py."""
import sys
+import time
import unittest
from itertools import chain
from test import test_support
rlines.append((pos, line))
self.assertEquals(rlines, wlines)
+ def testTelling(self):
+ f = io.open(test_support.TESTFN, "w+", encoding="utf8")
+ p0 = f.tell()
+ f.write(u"\xff\n")
+ p1 = f.tell()
+ f.write(u"\xff\n")
+ p2 = f.tell()
+ f.seek(0)
+ self.assertEquals(f.tell(), p0)
+ self.assertEquals(f.readline(), u"\xff\n")
+ self.assertEquals(f.tell(), p1)
+ self.assertEquals(f.readline(), u"\xff\n")
+ self.assertEquals(f.tell(), p2)
+ f.seek(0)
+ for line in f:
+ self.assertEquals(line, u"\xff\n")
+ self.assertRaises(IOError, f.tell)
+ self.assertEquals(f.tell(), p2)
+ f.close()
+
+ def timingTest(self):
+ timer = time.time
+ enc = "utf8"
+ line = u"\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
+ nlines = 10000
+ nchars = len(line)
+ nbytes = len(line.encode(enc))
+ for chunk_size in (32, 64, 128, 256):
+ f = io.open(test_support.TESTFN, "w+", encoding=enc)
+ f._CHUNK_SIZE = chunk_size
+ t0 = timer()
+ for i in range(nlines):
+ f.write(line)
+ f.flush()
+ t1 = timer()
+ f.seek(0)
+ for line in f:
+ pass
+ t2 = timer()
+ f.seek(0)
+ while f.readline():
+ pass
+ t3 = timer()
+ f.seek(0)
+ while f.readline():
+ f.tell()
+ t4 = timer()
+ f.close()
+ if test_support.verbose:
+ print("\nTiming test: %d lines of %d characters (%d bytes)" %
+ (nlines, nchars, nbytes))
+ print("File chunk size: %6s" % f._CHUNK_SIZE)
+ print("Writing: %6.3f seconds" % (t1-t0))
+ print("Reading using iteration: %6.3f seconds" % (t2-t1))
+ print("Reading using readline(): %6.3f seconds" % (t3-t2))
+ print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
+
# XXX Tests for open()