]> granicus.if.org Git - python/commitdiff
Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
authorAntoine Pitrou <solipsis@pitrou.net>
Mon, 21 Nov 2011 19:16:44 +0000 (20:16 +0100)
committerAntoine Pitrou <solipsis@pitrou.net>
Mon, 21 Nov 2011 19:16:44 +0000 (20:16 +0100)
raised when the wrapped raw file is non-blocking and the write would block.
Previous code assumed that the raw write() would raise BlockingIOError, but
RawIOBase.write() is defined to returned None when the call would block.
Patch by sbt.

Lib/_pyio.py
Lib/test/test_io.py
Misc/ACKS
Misc/NEWS
Modules/_io/bufferedio.c

index 3fa932563b3ff2d47c48b22765de65a830b18148..a2faeb32579b0b8b2b520a3e48316f18801e8e33 100644 (file)
@@ -6,6 +6,7 @@ import os
 import abc
 import codecs
 import warnings
+import errno
 # Import _thread instead of threading to reduce startup cost
 try:
     from _thread import allocate_lock as Lock
@@ -720,8 +721,11 @@ class _BufferedIOMixin(BufferedIOBase):
 
     def close(self):
         if self.raw is not None and not self.closed:
-            self.flush()
-            self.raw.close()
+            try:
+                # may raise BlockingIOError or BrokenPipeError etc
+                self.flush()
+            finally:
+                self.raw.close()
 
     def detach(self):
         if self.raw is None:
@@ -1080,13 +1084,9 @@ class BufferedWriter(_BufferedIOMixin):
             # XXX we can implement some more tricks to try and avoid
             # partial writes
             if len(self._write_buf) > self.buffer_size:
-                # We're full, so let's pre-flush the buffer
-                try:
-                    self._flush_unlocked()
-                except BlockingIOError as e:
-                    # We can't accept anything else.
-                    # XXX Why not just let the exception pass through?
-                    raise BlockingIOError(e.errno, e.strerror, 0)
+                # We're full, so let's pre-flush the buffer.  (This may
+                # raise BlockingIOError with characters_written == 0.)
+                self._flush_unlocked()
             before = len(self._write_buf)
             self._write_buf.extend(b)
             written = len(self._write_buf) - before
@@ -1117,24 +1117,23 @@ class BufferedWriter(_BufferedIOMixin):
     def _flush_unlocked(self):
         if self.closed:
             raise ValueError("flush of closed file")
-        written = 0
-        try:
-            while self._write_buf:
-                try:
-                    n = self.raw.write(self._write_buf)
-                except IOError as e:
-                    if e.errno != EINTR:
-                        raise
-                    continue
-                if n > len(self._write_buf) or n < 0:
-                    raise IOError("write() returned incorrect number of bytes")
-                del self._write_buf[:n]
-                written += n
-        except BlockingIOError as e:
-            n = e.characters_written
+        while self._write_buf:
+            try:
+                n = self.raw.write(self._write_buf)
+            except BlockingIOError:
+                raise RuntimeError("self.raw should implement RawIOBase: it "
+                                   "should not raise BlockingIOError")
+            except IOError as e:
+                if e.errno != EINTR:
+                    raise
+                continue
+            if n is None:
+                raise BlockingIOError(
+                    errno.EAGAIN,
+                    "write could not complete without blocking", 0)
+            if n > len(self._write_buf) or n < 0:
+                raise IOError("write() returned incorrect number of bytes")
             del self._write_buf[:n]
-            written += n
-            raise BlockingIOError(e.errno, e.strerror, written)
 
     def tell(self):
         return _BufferedIOMixin.tell(self) + len(self._write_buf)
index eb2ac5fe99d82499e94cd118aa12cdf98ee01750..df7ca15f58fe06a0070f4434f8807b864654d9eb 100644 (file)
@@ -42,7 +42,10 @@ try:
     import threading
 except ImportError:
     threading = None
-
+try:
+    import fcntl
+except ImportError:
+    fcntl = None
 
 def _default_chunk_size():
     """Get the default TextIOWrapper chunk size"""
@@ -242,9 +245,14 @@ class MockNonBlockWriterIO:
             except ValueError:
                 pass
             else:
-                self._blocker_char = None
-                self._write_stack.append(b[:n])
-                raise self.BlockingIOError(0, "test blocking", n)
+                if n > 0:
+                    # write data up to the first blocker
+                    self._write_stack.append(b[:n])
+                    return n
+                else:
+                    # cancel blocker and indicate would block
+                    self._blocker_char = None
+                    return None
         self._write_stack.append(b)
         return len(b)
 
@@ -2743,6 +2751,68 @@ class MiscIOTest(unittest.TestCase):
                 with self.open(support.TESTFN, **kwargs) as f:
                     self.assertRaises(TypeError, pickle.dumps, f, protocol)
 
+    @unittest.skipUnless(fcntl, 'fcntl required for this test')
+    def test_nonblock_pipe_write_bigbuf(self):
+        self._test_nonblock_pipe_write(16*1024)
+
+    @unittest.skipUnless(fcntl, 'fcntl required for this test')
+    def test_nonblock_pipe_write_smallbuf(self):
+        self._test_nonblock_pipe_write(1024)
+
+    def _set_non_blocking(self, fd):
+        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+        self.assertNotEqual(flags, -1)
+        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+        self.assertEqual(res, 0)
+
+    def _test_nonblock_pipe_write(self, bufsize):
+        sent = []
+        received = []
+        r, w = os.pipe()
+        self._set_non_blocking(r)
+        self._set_non_blocking(w)
+
+        # To exercise all code paths in the C implementation we need
+        # to play with buffer sizes.  For instance, if we choose a
+        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
+        # then we will never get a partial write of the buffer.
+        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
+        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
+
+        with rf, wf:
+            for N in 9999, 73, 7574:
+                try:
+                    i = 0
+                    while True:
+                        msg = bytes([i % 26 + 97]) * N
+                        sent.append(msg)
+                        wf.write(msg)
+                        i += 1
+
+                except self.BlockingIOError as e:
+                    self.assertEqual(e.args[0], errno.EAGAIN)
+                    sent[-1] = sent[-1][:e.characters_written]
+                    received.append(rf.read())
+                    msg = b'BLOCKED'
+                    wf.write(msg)
+                    sent.append(msg)
+
+            while True:
+                try:
+                    wf.flush()
+                    break
+                except self.BlockingIOError as e:
+                    self.assertEqual(e.args[0], errno.EAGAIN)
+                    self.assertEqual(e.characters_written, 0)
+                    received.append(rf.read())
+
+            received += iter(rf.read, None)
+
+        sent, received = b''.join(sent), b''.join(received)
+        self.assertTrue(sent == received)
+        self.assertTrue(wf.closed)
+        self.assertTrue(rf.closed)
+
 class CMiscIOTest(MiscIOTest):
     io = io
 
index c66edb7ab175e99e845dde811df22bcd6fa7a5c6..7f7296e0d050489dd73c38b0ad35e1e62da61fc8 100644 (file)
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -796,6 +796,7 @@ Ilya Sandler
 Mark Sapiro
 Ty Sarna
 Ben Sayer
+sbt
 Andrew Schaaf
 Michael Scharf
 Andreas Schawo
index c3765dd973bda544b6e6cf4be7ddc0d4ac80bd4d..4e43b4546af4c010150e8a22acb74eb7d4e4d7cb 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -80,6 +80,12 @@ Core and Builtins
 Library
 -------
 
+- Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
+  raised when the wrapped raw file is non-blocking and the write would block.
+  Previous code assumed that the raw write() would raise BlockingIOError, but
+  RawIOBase.write() is defined to returned None when the call would block.
+  Patch by sbt.
+
 - Issue #13358: HTMLParser now calls handle_data only once for each CDATA.
 
 - Issue #4147: minidom's toprettyxml no longer adds whitespace around a text
index c979ac2336d19f3742eb9afecc13a95637ff07ce..4c43c69060e0eb2fff9e8da3990bb31f70a2d11b 100644 (file)
@@ -581,7 +581,7 @@ buffered_getstate(buffered *self, PyObject *args)
 
 /* Forward decls */
 static PyObject *
-_bufferedwriter_flush_unlocked(buffered *, int);
+_bufferedwriter_flush_unlocked(buffered *);
 static Py_ssize_t
 _bufferedreader_fill_buffer(buffered *self);
 static void
@@ -602,6 +602,18 @@ _bufferedreader_read_generic(buffered *self, Py_ssize_t);
  * Helpers
  */
 
+/* Sets the current error to BlockingIOError */
+static void
+_set_BlockingIOError(char *msg, Py_ssize_t written)
+{
+    PyObject *err;
+    err = PyObject_CallFunction(PyExc_BlockingIOError, "isn",
+                                errno, msg, written);
+    if (err)
+        PyErr_SetObject(PyExc_BlockingIOError, err);
+    Py_XDECREF(err);
+}
+
 /* Returns the address of the `written` member if a BlockingIOError was
    raised, NULL otherwise. The error is always re-raised. */
 static Py_ssize_t *
@@ -756,7 +768,7 @@ buffered_flush_and_rewind_unlocked(buffered *self)
 {
     PyObject *res;
 
-    res = _bufferedwriter_flush_unlocked(self, 0);
+    res = _bufferedwriter_flush_unlocked(self);
     if (res == NULL)
         return NULL;
     Py_DECREF(res);
@@ -1121,7 +1133,7 @@ buffered_seek(buffered *self, PyObject *args)
 
     /* Fallback: invoke raw seek() method and clear buffer */
     if (self->writable) {
-        res = _bufferedwriter_flush_unlocked(self, 0);
+        res = _bufferedwriter_flush_unlocked(self);
         if (res == NULL)
             goto end;
         Py_CLEAR(res);
@@ -1714,6 +1726,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
     Py_buffer buf;
     PyObject *memobj, *res;
     Py_ssize_t n;
+    int errnum;
     /* NOTE: the buffer needn't be released as its object is NULL. */
     if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1)
         return -1;
@@ -1726,11 +1739,21 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
        raised (see issue #10956).
     */
     do {
+        errno = 0;
         res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
+        errnum = errno;
     } while (res == NULL && _trap_eintr());
     Py_DECREF(memobj);
     if (res == NULL)
         return -1;
+    if (res == Py_None) {
+        /* Non-blocking stream would have blocked. Special return code!
+           Being paranoid we reset errno in case it is changed by code
+           triggered by a decref.  errno is used by _set_BlockingIOError(). */
+        Py_DECREF(res);
+        errno = errnum;
+        return -2;
+    }
     n = PyNumber_AsSsize_t(res, PyExc_ValueError);
     Py_DECREF(res);
     if (n < 0 || n > len) {
@@ -1747,7 +1770,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
 /* `restore_pos` is 1 if we need to restore the raw stream position at
    the end, 0 otherwise. */
 static PyObject *
-_bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
+_bufferedwriter_flush_unlocked(buffered *self)
 {
     Py_ssize_t written = 0;
     Py_off_t n, rewind;
@@ -1769,14 +1792,11 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
             Py_SAFE_DOWNCAST(self->write_end - self->write_pos,
                              Py_off_t, Py_ssize_t));
         if (n == -1) {
-            Py_ssize_t *w = _buffered_check_blocking_error();
-            if (w == NULL)
-                goto error;
-            self->write_pos += *w;
-            self->raw_pos = self->write_pos;
-            written += *w;
-            *w = written;
-            /* Already re-raised */
+            goto error;
+        }
+        else if (n == -2) {
+            _set_BlockingIOError("write could not complete without blocking",
+                                 0);
             goto error;
         }
         self->write_pos += n;
@@ -1789,16 +1809,6 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
             goto error;
     }
 
-    if (restore_pos) {
-        Py_off_t forward = rewind - written;
-        if (forward != 0) {
-            n = _buffered_raw_seek(self, forward, 1);
-            if (n < 0) {
-                goto error;
-            }
-            self->raw_pos += forward;
-        }
-    }
     _bufferedwriter_reset_buf(self);
 
 end:
@@ -1851,7 +1861,7 @@ bufferedwriter_write(buffered *self, PyObject *args)
     }
 
     /* First write the current buffer */
-    res = _bufferedwriter_flush_unlocked(self, 0);
+    res = _bufferedwriter_flush_unlocked(self);
     if (res == NULL) {
         Py_ssize_t *w = _buffered_check_blocking_error();
         if (w == NULL)
@@ -1874,14 +1884,19 @@ bufferedwriter_write(buffered *self, PyObject *args)
             PyErr_Clear();
             memcpy(self->buffer + self->write_end, buf.buf, buf.len);
             self->write_end += buf.len;
+            self->pos += buf.len;
             written = buf.len;
             goto end;
         }
         /* Buffer as much as possible. */
         memcpy(self->buffer + self->write_end, buf.buf, avail);
         self->write_end += avail;
-        /* Already re-raised */
-        *w = avail;
+        self->pos += avail;
+        /* XXX Modifying the existing exception e using the pointer w
+           will change e.characters_written but not e.args[2].
+           Therefore we just replace with a new error. */
+        _set_BlockingIOError("write could not complete without blocking",
+                             avail);
         goto error;
     }
     Py_CLEAR(res);
@@ -1906,11 +1921,9 @@ bufferedwriter_write(buffered *self, PyObject *args)
         Py_ssize_t n = _bufferedwriter_raw_write(
             self, (char *) buf.buf + written, buf.len - written);
         if (n == -1) {
-            Py_ssize_t *w = _buffered_check_blocking_error();
-            if (w == NULL)
-                goto error;
-            written += *w;
-            remaining -= *w;
+            goto error;
+        } else if (n == -2) {
+            /* Write failed because raw file is non-blocking */
             if (remaining > self->buffer_size) {
                 /* Can't buffer everything, still buffer as much as possible */
                 memcpy(self->buffer,
@@ -1918,8 +1931,9 @@ bufferedwriter_write(buffered *self, PyObject *args)
                 self->raw_pos = 0;
                 ADJUST_POSITION(self, self->buffer_size);
                 self->write_end = self->buffer_size;
-                *w = written + self->buffer_size;
-                /* Already re-raised */
+                written += self->buffer_size;
+                _set_BlockingIOError("write could not complete without "
+                                     "blocking", written);
                 goto error;
             }
             PyErr_Clear();