]> granicus.if.org Git - python/commitdiff
bpo-33694: Fix race condition in asyncio proactor (GH-7498)
authorVictor Stinner <vstinner@redhat.com>
Thu, 7 Jun 2018 22:25:52 +0000 (00:25 +0200)
committerGitHub <noreply@github.com>
Thu, 7 Jun 2018 22:25:52 +0000 (00:25 +0200)
The cancellation of an overlapped WSARecv() has a race condition
which causes data loss because of the current implementation of
proactor in asyncio.

No longer cancel overlapped WSARecv() in _ProactorReadPipeTransport
to work around the race condition.

Remove the optimized recv_into() implementation to get simple
implementation of pause_reading() using the single _pending_data
attribute.

Move _feed_data_to_bufferred_proto() to protocols.py.

Remove set_protocol() method which became useless.

Lib/asyncio/proactor_events.py
Lib/asyncio/protocols.py
Lib/asyncio/sslproto.py
Lib/test/test_asyncio/test_proactor_events.py
Lib/test/test_asyncio/test_sslproto.py
Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst [new file with mode: 0644]

index 337ed0fb2047510fd651adb9a58bb72c2408f0eb..d9cfdff02c9ae19eaf048d1745573d1054548a6c 100644 (file)
@@ -159,27 +159,13 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
 
     def __init__(self, loop, sock, protocol, waiter=None,
                  extra=None, server=None):
-        self._loop_reading_cb = None
+        self._pending_data = None
         self._paused = True
         super().__init__(loop, sock, protocol, waiter, extra, server)
 
-        self._reschedule_on_resume = False
         self._loop.call_soon(self._loop_reading)
         self._paused = False
 
-    def set_protocol(self, protocol):
-        if isinstance(protocol, protocols.BufferedProtocol):
-            self._loop_reading_cb = self._loop_reading__get_buffer
-        else:
-            self._loop_reading_cb = self._loop_reading__data_received
-
-        super().set_protocol(protocol)
-
-        if self.is_reading():
-            # reset reading callback / buffers / self._read_fut
-            self.pause_reading()
-            self.resume_reading()
-
     def is_reading(self):
         return not self._paused and not self._closing
 
@@ -188,17 +174,16 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
             return
         self._paused = True
 
-        if self._read_fut is not None and not self._read_fut.done():
-            # TODO: This is an ugly hack to cancel the current read future
-            # *and* avoid potential race conditions, as read cancellation
-            # goes through `future.cancel()` and `loop.call_soon()`.
-            # We then use this special attribute in the reader callback to
-            # exit *immediately* without doing any cleanup/rescheduling.
-            self._read_fut.__asyncio_cancelled_on_pause__ = True
-
-            self._read_fut.cancel()
-            self._read_fut = None
-            self._reschedule_on_resume = True
+        # bpo-33694: Don't cancel self._read_fut because cancelling an
+        # overlapped WSASend() loss silently data with the current proactor
+        # implementation.
+        #
+        # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend()
+        # completed (even if HasOverlappedIoCompleted() returns 0), but
+        # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND
+        # error. Once the overlapped is ignored, the IOCP loop will ignores the
+        # completion I/O event and so not read the result of the overlapped
+        # WSARecv().
 
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
@@ -206,14 +191,22 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
     def resume_reading(self):
         if self._closing or not self._paused:
             return
+
         self._paused = False
-        if self._reschedule_on_resume:
-            self._loop.call_soon(self._loop_reading, self._read_fut)
-            self._reschedule_on_resume = False
+        if self._read_fut is None:
+            self._loop.call_soon(self._loop_reading, None)
+
+        data = self._pending_data
+        self._pending_data = None
+        if data is not None:
+            # Call the protocol methode after calling _loop_reading(),
+            # since the protocol can decide to pause reading again.
+            self._loop.call_soon(self._data_received, data)
+
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
-    def _loop_reading__on_eof(self):
+    def _eof_received(self):
         if self._loop.get_debug():
             logger.debug("%r received EOF", self)
 
@@ -227,18 +220,30 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
         if not keep_open:
             self.close()
 
-    def _loop_reading(self, fut=None):
-        self._loop_reading_cb(fut)
-
-    def _loop_reading__data_received(self, fut):
-        if (fut is not None and
-                getattr(fut, '__asyncio_cancelled_on_pause__', False)):
+    def _data_received(self, data):
+        if self._paused:
+            # Don't call any protocol method while reading is paused.
+            # The protocol will be called on resume_reading().
+            assert self._pending_data is None
+            self._pending_data = data
             return
 
-        if self._paused:
-            self._reschedule_on_resume = True
+        if not data:
+            self._eof_received()
             return
 
+        if isinstance(self._protocol, protocols.BufferedProtocol):
+            try:
+                protocols._feed_data_to_bufferred_proto(self._protocol, data)
+            except Exception as exc:
+                self._fatal_error(exc,
+                                  'Fatal error: protocol.buffer_updated() '
+                                  'call failed.')
+                return
+        else:
+            self._protocol.data_received(data)
+
+    def _loop_reading(self, fut=None):
         data = None
         try:
             if fut is not None:
@@ -261,8 +266,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
                 # we got end-of-file so no need to reschedule a new read
                 return
 
-            # reschedule a new read
-            self._read_fut = self._loop._proactor.recv(self._sock, 32768)
+            # bpo-33694: buffer_updated() has currently no fast path because of
+            # a data loss issue caused by overlapped WSASend() cancellation.
+
+            if not self._paused:
+                # reschedule a new read
+                self._read_fut = self._loop._proactor.recv(self._sock, 32768)
         except ConnectionAbortedError as exc:
             if not self._closing:
                 self._fatal_error(exc, 'Fatal read error on pipe transport')
@@ -277,92 +286,11 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
             if not self._closing:
                 raise
         else:
-            self._read_fut.add_done_callback(self._loop_reading__data_received)
+            if not self._paused:
+                self._read_fut.add_done_callback(self._loop_reading)
         finally:
-            if data:
-                self._protocol.data_received(data)
-            elif data == b'':
-                self._loop_reading__on_eof()
-
-    def _loop_reading__get_buffer(self, fut):
-        if (fut is not None and
-                getattr(fut, '__asyncio_cancelled_on_pause__', False)):
-            return
-
-        if self._paused:
-            self._reschedule_on_resume = True
-            return
-
-        nbytes = None
-        if fut is not None:
-            assert self._read_fut is fut or (self._read_fut is None and
-                                             self._closing)
-            self._read_fut = None
-            try:
-                if fut.done():
-                    nbytes = fut.result()
-                else:
-                    # the future will be replaced by next proactor.recv call
-                    fut.cancel()
-            except ConnectionAbortedError as exc:
-                if not self._closing:
-                    self._fatal_error(
-                        exc, 'Fatal read error on pipe transport')
-                elif self._loop.get_debug():
-                    logger.debug("Read error on pipe transport while closing",
-                                 exc_info=True)
-            except ConnectionResetError as exc:
-                self._force_close(exc)
-            except OSError as exc:
-                self._fatal_error(exc, 'Fatal read error on pipe transport')
-            except futures.CancelledError:
-                if not self._closing:
-                    raise
-
-            if nbytes is not None:
-                if nbytes == 0:
-                    # we got end-of-file so no need to reschedule a new read
-                    self._loop_reading__on_eof()
-                else:
-                    try:
-                        self._protocol.buffer_updated(nbytes)
-                    except Exception as exc:
-                        self._fatal_error(
-                            exc,
-                            'Fatal error: '
-                            'protocol.buffer_updated() call failed.')
-                        return
-
-        if self._closing or nbytes == 0:
-            # since close() has been called we ignore any read data
-            return
-
-        try:
-            buf = self._protocol.get_buffer(-1)
-            if not len(buf):
-                raise RuntimeError('get_buffer() returned an empty buffer')
-        except Exception as exc:
-            self._fatal_error(
-                exc, 'Fatal error: protocol.get_buffer() call failed.')
-            return
-
-        try:
-            # schedule a new read
-            self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
-            self._read_fut.add_done_callback(self._loop_reading__get_buffer)
-        except ConnectionAbortedError as exc:
-            if not self._closing:
-                self._fatal_error(exc, 'Fatal read error on pipe transport')
-            elif self._loop.get_debug():
-                logger.debug("Read error on pipe transport while closing",
-                             exc_info=True)
-        except ConnectionResetError as exc:
-            self._force_close(exc)
-        except OSError as exc:
-            self._fatal_error(exc, 'Fatal read error on pipe transport')
-        except futures.CancelledError:
-            if not self._closing:
-                raise
+            if data is not None:
+                self._data_received(data)
 
 
 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
index b8d2e6be552e1e338d5a4e5d3921adbcfeb1618f..4d47da387caa3f5826a67e21fa01bd8494a623d5 100644 (file)
@@ -189,3 +189,22 @@ class SubprocessProtocol(BaseProtocol):
 
     def process_exited(self):
         """Called when subprocess has exited."""
+
+
+def _feed_data_to_bufferred_proto(proto, data):
+    data_len = len(data)
+    while data_len:
+        buf = proto.get_buffer(data_len)
+        buf_len = len(buf)
+        if not buf_len:
+            raise RuntimeError('get_buffer() returned an empty buffer')
+
+        if buf_len >= data_len:
+            buf[:data_len] = data
+            proto.buffer_updated(data_len)
+            return
+        else:
+            buf[:buf_len] = data[:buf_len]
+            proto.buffer_updated(buf_len)
+            data = data[buf_len:]
+            data_len = len(data)
index fac2ae74e808b884e761102bfc89c1dec7267c71..5578c6f81834ae666d0deb7d706ccd0cf347398f 100644 (file)
@@ -535,7 +535,7 @@ class SSLProtocol(protocols.Protocol):
             if chunk:
                 try:
                     if self._app_protocol_is_buffer:
-                        _feed_data_to_bufferred_proto(
+                        protocols._feed_data_to_bufferred_proto(
                             self._app_protocol, chunk)
                     else:
                         self._app_protocol.data_received(chunk)
@@ -721,22 +721,3 @@ class SSLProtocol(protocols.Protocol):
                 self._transport.abort()
         finally:
             self._finalize()
-
-
-def _feed_data_to_bufferred_proto(proto, data):
-    data_len = len(data)
-    while data_len:
-        buf = proto.get_buffer(data_len)
-        buf_len = len(buf)
-        if not buf_len:
-            raise RuntimeError('get_buffer() returned an empty buffer')
-
-        if buf_len >= data_len:
-            buf[:data_len] = data
-            proto.buffer_updated(data_len)
-            return
-        else:
-            buf[:buf_len] = data[:buf_len]
-            proto.buffer_updated(buf_len)
-            data = data[buf_len:]
-            data_len = len(data)
index 26588634de04a7bb3ef2d048c4d12eda01b51fa8..ac529413c2756bb0477a89daaebc6b2bebf0b065 100644 (file)
@@ -459,6 +459,8 @@ class ProactorSocketTransportTests(test_utils.TestCase):
         self.assertFalse(self.protocol.pause_writing.called)
 
 
+@unittest.skip('FIXME: bpo-33694: these tests are too close '
+               'to the implementation and should be refactored or removed')
 class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
 
     def setUp(self):
@@ -551,6 +553,8 @@ class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
         self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
         buf_proto.buffer_updated.assert_called_with(4)
 
+    @unittest.skip('FIXME: bpo-33694: this test is too close to the '
+                   'implementation and should be refactored or removed')
     def test_proto_buf_switch(self):
         tr = self.socket_transport()
         test_utils.run_briefly(self.loop)
index 78ab1eb8223148f2c147cbb3174a87cd1539fbdc..4ace48f9988894d4b0fda084a7aac69a7663dcf4 100644 (file)
@@ -11,6 +11,7 @@ except ImportError:
 
 import asyncio
 from asyncio import log
+from asyncio import protocols
 from asyncio import sslproto
 from asyncio import tasks
 from test.test_asyncio import utils as test_utils
@@ -189,28 +190,28 @@ class BaseStartTLS(func_tests.FunctionalTestCaseMixin):
 
         for usemv in [False, True]:
             proto = Proto(1, usemv)
-            sslproto._feed_data_to_bufferred_proto(proto, b'12345')
+            protocols._feed_data_to_bufferred_proto(proto, b'12345')
             self.assertEqual(proto.data, b'12345')
 
             proto = Proto(2, usemv)
-            sslproto._feed_data_to_bufferred_proto(proto, b'12345')
+            protocols._feed_data_to_bufferred_proto(proto, b'12345')
             self.assertEqual(proto.data, b'12345')
 
             proto = Proto(2, usemv)
-            sslproto._feed_data_to_bufferred_proto(proto, b'1234')
+            protocols._feed_data_to_bufferred_proto(proto, b'1234')
             self.assertEqual(proto.data, b'1234')
 
             proto = Proto(4, usemv)
-            sslproto._feed_data_to_bufferred_proto(proto, b'1234')
+            protocols._feed_data_to_bufferred_proto(proto, b'1234')
             self.assertEqual(proto.data, b'1234')
 
             proto = Proto(100, usemv)
-            sslproto._feed_data_to_bufferred_proto(proto, b'12345')
+            protocols._feed_data_to_bufferred_proto(proto, b'12345')
             self.assertEqual(proto.data, b'12345')
 
             proto = Proto(0, usemv)
             with self.assertRaisesRegex(RuntimeError, 'empty buffer'):
-                sslproto._feed_data_to_bufferred_proto(proto, b'12345')
+                protocols._feed_data_to_bufferred_proto(proto, b'12345')
 
     def test_start_tls_client_reg_proto_1(self):
         HELLO_MSG = b'1' * self.PAYLOAD_SIZE
diff --git a/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst b/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst
new file mode 100644 (file)
index 0000000..6b7f15c
--- /dev/null
@@ -0,0 +1,2 @@
+asyncio: Fix a race condition causing data loss on
+pause_reading()/resume_reading() when using the ProactorEventLoop.