This is not implemented for read-only and non-blocking streams.
"""
+ if self.__closed:
+ raise ValueError("flush of closed file")
+ #self._checkClosed()
# XXX Should this return the number of bytes written???
__closed = False
This method has no effect if the file is already closed.
"""
if not self.__closed:
- try:
- self.flush()
- except IOError:
- pass # If flush() fails, just give up
+ self.flush()
self.__closed = True
def __del__(self):
def close(self):
if not self.closed:
- try:
- self.flush()
- except IOError:
- pass # If flush() fails, just give up
+ self.flush()
self.raw.close()
### Inquiries ###
return self.raw.truncate(pos)
def flush(self):
+ if self.closed:
+ raise ValueError("flush of closed file")
with self._write_lock:
self._flush_unlocked()
self._telling = self._seekable
def close(self):
- try:
+ if not self.closed:
self.flush()
- except:
- pass # If flush() fails, just give up
- self.buffer.close()
+ self.buffer.close()
@property
def closed(self):
file = io.open(f.fileno(), "r", closefd=False)
self.assertEqual(file.buffer.raw.closefd, False)
+ def test_flush_error_on_close(self):
+ f = io.open(test_support.TESTFN, "wb", buffering=0)
+ def bad_flush():
+ raise IOError()
+ f.flush = bad_flush
+ self.assertRaises(IOError, f.close) # exception not swallowed
+
+ def test_multi_close(self):
+ f = io.open(test_support.TESTFN, "wb", buffering=0)
+ f.close()
+ f.close()
+ f.close()
+ self.assertRaises(ValueError, f.flush)
+
class MemorySeekTestMixin:
finally:
test_support.unlink(test_support.TESTFN)
+ def test_flush_error_on_close(self):
+ raw = MockRawIO()
+ def bad_flush():
+ raise IOError()
+ raw.flush = bad_flush
+ b = io.BufferedWriter(raw)
+ self.assertRaises(IOError, b.close) # exception not swallowed
+
+ def test_multi_close(self):
+ raw = MockRawIO()
+ b = io.BufferedWriter(raw)
+ b.close()
+ b.close()
+ b.close()
+ self.assertRaises(ValueError, b.flush)
+
class BufferedRWPairTest(unittest.TestCase):
decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
self.check_newline_decoder_utf8(decoder)
+ def test_flush_error_on_close(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+ def bad_flush():
+ raise IOError()
+ txt.flush = bad_flush
+ self.assertRaises(IOError, txt.close) # exception not swallowed
+
+ def test_multi_close(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
+ txt.close()
+ txt.close()
+ txt.close()
+ self.assertRaises(ValueError, txt.flush)
+
# XXX Tests for open()
Library
-------
+- Issue #7865: The close() method of :mod:`io` objects should not swallow
+ exceptions raised by the implicit flush(). Also ensure that calling
+ close() several times is supported. Initial patch by Pascal Chambon.
+
- Issue #8581: logging: removed errors raised when closing handlers twice.
- Issue #4687: Fix accuracy of garbage collection runtimes displayed with