def test_invalid_operations(self):
# Try writing on a file opened in read mode and vice-versa.
for mode in ("w", "wb"):
- with open(support.TESTFN, mode) as fp:
+ with self.open(support.TESTFN, mode) as fp:
self.assertRaises(IOError, fp.read)
self.assertRaises(IOError, fp.readline)
- with open(support.TESTFN, "rb") as fp:
+ with self.open(support.TESTFN, "rb") as fp:
self.assertRaises(IOError, fp.write, b"blah")
self.assertRaises(IOError, fp.writelines, [b"blah\n"])
- with open(support.TESTFN, "r") as fp:
+ with self.open(support.TESTFN, "r") as fp:
self.assertRaises(IOError, fp.write, "blah")
self.assertRaises(IOError, fp.writelines, ["blah\n"])
def test_with_open(self):
for bufsize in (0, 1, 100):
f = None
- with open(support.TESTFN, "wb", bufsize) as f:
+ with self.open(support.TESTFN, "wb", bufsize) as f:
f.write(b"xxx")
self.assertEqual(f.closed, True)
f = None
try:
- with open(support.TESTFN, "wb", bufsize) as f:
+ with self.open(support.TESTFN, "wb", bufsize) as f:
1/0
except ZeroDivisionError:
self.assertEqual(f.closed, True)
del f
support.gc_collect()
self.assertEqual(record, [1, 2, 3])
- with open(support.TESTFN, "rb") as f:
+ with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"xxx")
def _check_base_destructor(self, base):
del f
support.gc_collect()
self.assert_(wr() is None, wr)
- with open(support.TESTFN, "rb") as f:
+ with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"abcxxx")
def test_unbounded_file(self):
self.skipTest("test can only run in a 32-bit address space")
if support.real_max_memuse < support._2G:
self.skipTest("test requires at least 2GB of memory")
- with open(zero, "rb", buffering=0) as f:
+ with self.open(zero, "rb", buffering=0) as f:
self.assertRaises(OverflowError, f.read)
- with open(zero, "rb") as f:
+ with self.open(zero, "rb") as f:
self.assertRaises(OverflowError, f.read)
- with open(zero, "r") as f:
+ with self.open(zero, "r") as f:
self.assertRaises(OverflowError, f.read)
class CIOTest(IOTest):
l = list(range(256)) * N
random.shuffle(l)
s = bytes(bytearray(l))
- with io.open(support.TESTFN, "wb") as f:
+ with self.open(support.TESTFN, "wb") as f:
f.write(s)
- with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
+ with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
bufio = self.tp(raw, 8)
errors = []
results = []
def test_truncate(self):
# Truncate implicitly flushes the buffer.
- with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+ with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
bufio = self.tp(raw, 8)
bufio.write(b"abcdef")
self.assertEqual(bufio.truncate(3), 3)
self.assertEqual(bufio.tell(), 3)
- with io.open(support.TESTFN, "rb", buffering=0) as f:
+ with self.open(support.TESTFN, "rb", buffering=0) as f:
self.assertEqual(f.read(), b"abc")
def test_threads(self):
# writing the buffer to the raw streams. This is in addition
# to concurrency issues due to switching threads in the middle
# of Python code.
- with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+ with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
bufio = self.tp(raw, 8)
errors = []
def f():
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
bufio.close()
- with io.open(support.TESTFN, "rb") as f:
+ with self.open(support.TESTFN, "rb") as f:
s = f.read()
for i in range(256):
self.assertEquals(s.count(bytes([i])), N)
del f
support.gc_collect()
self.assert_(wr() is None, wr)
- with open(support.TESTFN, "rb") as f:
+ with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
del t
support.gc_collect()
self.assert_(wr() is None, wr)
- with open(support.TESTFN, "rb") as f:
+ with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")
class PyTextIOWrapperTest(TextIOWrapperTest):