def _flush_unlocked(self):
if self.closed:
raise ValueError("flush of closed file")
- written = 0
- try:
- while self._write_buf:
- try:
- n = self.raw.write(self._write_buf)
- except InterruptedError:
- continue
- if n > len(self._write_buf) or n < 0:
- raise IOError("write() returned incorrect number of bytes")
- del self._write_buf[:n]
- written += n
- except BlockingIOError as e:
- n = e.characters_written
+ while self._write_buf:
+ try:
+ n = self.raw.write(self._write_buf)
++ except InterruptedError:
++ continue
+ except BlockingIOError:
+ raise RuntimeError("self.raw should implement RawIOBase: it "
+ "should not raise BlockingIOError")
- except IOError as e:
- if e.errno != EINTR:
- raise
- continue
+ if n is None:
+ raise BlockingIOError(
+ errno.EAGAIN,
+ "write could not complete without blocking", 0)
+ if n > len(self._write_buf) or n < 0:
+ raise IOError("write() returned incorrect number of bytes")
del self._write_buf[:n]
- written += n
- raise BlockingIOError(e.errno, e.strerror, written)
def tell(self):
return _BufferedIOMixin.tell(self) + len(self._write_buf)
with self.open(support.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
+ @unittest.skipUnless(fcntl, 'fcntl required for this test')
+ def test_nonblock_pipe_write_bigbuf(self):
+ self._test_nonblock_pipe_write(16*1024)
+
+ @unittest.skipUnless(fcntl, 'fcntl required for this test')
+ def test_nonblock_pipe_write_smallbuf(self):
+ self._test_nonblock_pipe_write(1024)
+
+ def _set_non_blocking(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ self.assertNotEqual(flags, -1)
+ res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ self.assertEqual(res, 0)
+
+ def _test_nonblock_pipe_write(self, bufsize):
+ sent = []
+ received = []
+ r, w = os.pipe()
+ self._set_non_blocking(r)
+ self._set_non_blocking(w)
+
+ # To exercise all code paths in the C implementation we need
+ # to play with buffer sizes. For instance, if we choose a
+ # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
+ # then we will never get a partial write of the buffer.
+ rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
+ wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
+
+ with rf, wf:
+ for N in 9999, 73, 7574:
+ try:
+ i = 0
+ while True:
+ msg = bytes([i % 26 + 97]) * N
+ sent.append(msg)
+ wf.write(msg)
+ i += 1
+
+ except self.BlockingIOError as e:
+ self.assertEqual(e.args[0], errno.EAGAIN)
++ self.assertEqual(e.args[2], e.characters_written)
+ sent[-1] = sent[-1][:e.characters_written]
+ received.append(rf.read())
+ msg = b'BLOCKED'
+ wf.write(msg)
+ sent.append(msg)
+
+ while True:
+ try:
+ wf.flush()
+ break
+ except self.BlockingIOError as e:
+ self.assertEqual(e.args[0], errno.EAGAIN)
++ self.assertEqual(e.args[2], e.characters_written)
+ self.assertEqual(e.characters_written, 0)
+ received.append(rf.read())
+
+ received += iter(rf.read, None)
+
+ sent, received = b''.join(sent), b''.join(received)
+ self.assertTrue(sent == received)
+ self.assertTrue(wf.closed)
+ self.assertTrue(rf.closed)
+
class CMiscIOTest(MiscIOTest):
io = io