]> granicus.if.org Git - python/commitdiff
Important race condition fix for Tulip.
authorGuido van Rossum <guido@dropbox.com>
Fri, 18 Oct 2013 17:10:36 +0000 (10:10 -0700)
committerGuido van Rossum <guido@dropbox.com>
Fri, 18 Oct 2013 17:10:36 +0000 (10:10 -0700)
Lib/asyncio/selector_events.py

index 2edac65bb52fd32cc5cc778229e26f6a415f2e3d..084d9be7db7a1160443b609dab75c80c60a5c896 100644 (file)
@@ -344,7 +344,7 @@ class _SelectorTransport(transports.Transport):
         self._protocol = protocol
         self._server = server
         self._buffer = collections.deque()
-        self._conn_lost = 0
+        self._conn_lost = 0  # Set when call to connection_lost scheduled.
         self._closing = False  # Set when close() called.
         if server is not None:
             server.attach(self)
@@ -356,27 +356,27 @@ class _SelectorTransport(transports.Transport):
         if self._closing:
             return
         self._closing = True
-        self._conn_lost += 1
         self._loop.remove_reader(self._sock_fd)
         if not self._buffer:
+            self._conn_lost += 1
             self._loop.call_soon(self._call_connection_lost, None)
 
     def _fatal_error(self, exc):
-        # should be called from exception handler only
-        logger.exception('Fatal error for %s', self)
+        # Should be called from exception handler only.
+        if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+            logger.exception('Fatal error for %s', self)
         self._force_close(exc)
 
     def _force_close(self, exc):
+        if self._conn_lost:
+            return
         if self._buffer:
             self._buffer.clear()
             self._loop.remove_writer(self._sock_fd)
-
-        if self._closing:
-            return
-
-        self._closing = True
+        if not self._closing:
+            self._closing = True
+            self._loop.remove_reader(self._sock_fd)
         self._conn_lost += 1
-        self._loop.remove_reader(self._sock_fd)
         self._loop.call_soon(self._call_connection_lost, exc)
 
     def _call_connection_lost(self, exc):
@@ -424,8 +424,6 @@ class _SelectorSocketTransport(_SelectorTransport):
             data = self._sock.recv(self.max_size)
         except (BlockingIOError, InterruptedError):
             pass
-        except ConnectionResetError as exc:
-            self._force_close(exc)
         except Exception as exc:
             self._fatal_error(exc)
         else:
@@ -453,17 +451,15 @@ class _SelectorSocketTransport(_SelectorTransport):
             try:
                 n = self._sock.send(data)
             except (BlockingIOError, InterruptedError):
-                n = 0
-            except (BrokenPipeError, ConnectionResetError) as exc:
-                self._force_close(exc)
-                return
-            except OSError as exc:
+                pass
+            except Exception as exc:
                 self._fatal_error(exc)
                 return
             else:
                 data = data[n:]
                 if not data:
                     return
+
             # Start async I/O.
             self._loop.add_writer(self._sock_fd, self._write_ready)
 
@@ -478,9 +474,6 @@ class _SelectorSocketTransport(_SelectorTransport):
             n = self._sock.send(data)
         except (BlockingIOError, InterruptedError):
             self._buffer.append(data)
-        except (BrokenPipeError, ConnectionResetError) as exc:
-            self._loop.remove_writer(self._sock_fd)
-            self._force_close(exc)
         except Exception as exc:
             self._loop.remove_writer(self._sock_fd)
             self._fatal_error(exc)
@@ -493,7 +486,6 @@ class _SelectorSocketTransport(_SelectorTransport):
                 elif self._eof:
                     self._sock.shutdown(socket.SHUT_WR)
                 return
-
             self._buffer.append(data)  # Try again later.
 
     def write_eof(self):
@@ -622,8 +614,6 @@ class _SelectorSslTransport(_SelectorTransport):
             except (BlockingIOError, InterruptedError,
                     ssl.SSLWantReadError, ssl.SSLWantWriteError):
                 pass
-            except ConnectionResetError as exc:
-                self._force_close(exc)
             except Exception as exc:
                 self._fatal_error(exc)
             else:
@@ -644,10 +634,6 @@ class _SelectorSslTransport(_SelectorTransport):
             except (BlockingIOError, InterruptedError,
                     ssl.SSLWantReadError, ssl.SSLWantWriteError):
                 n = 0
-            except (BrokenPipeError, ConnectionResetError) as exc:
-                self._loop.remove_writer(self._sock_fd)
-                self._force_close(exc)
-                return
             except Exception as exc:
                 self._loop.remove_writer(self._sock_fd)
                 self._fatal_error(exc)
@@ -726,12 +712,12 @@ class _SelectorDatagramTransport(_SelectorTransport):
                 else:
                     self._sock.sendto(data, addr)
                 return
+            except (BlockingIOError, InterruptedError):
+                self._loop.add_writer(self._sock_fd, self._sendto_ready)
             except ConnectionRefusedError as exc:
                 if self._address:
                     self._fatal_error(exc)
                 return
-            except (BlockingIOError, InterruptedError):
-                self._loop.add_writer(self._sock_fd, self._sendto_ready)
             except Exception as exc:
                 self._fatal_error(exc)
                 return
@@ -746,13 +732,13 @@ class _SelectorDatagramTransport(_SelectorTransport):
                     self._sock.send(data)
                 else:
                     self._sock.sendto(data, addr)
+            except (BlockingIOError, InterruptedError):
+                self._buffer.appendleft((data, addr))  # Try again later.
+                break
             except ConnectionRefusedError as exc:
                 if self._address:
                     self._fatal_error(exc)
                 return
-            except (BlockingIOError, InterruptedError):
-                self._buffer.appendleft((data, addr))  # Try again later.
-                break
             except Exception as exc:
                 self._fatal_error(exc)
                 return
@@ -765,5 +751,4 @@ class _SelectorDatagramTransport(_SelectorTransport):
     def _force_close(self, exc):
         if self._address and isinstance(exc, ConnectionRefusedError):
             self._protocol.connection_refused(exc)
-
         super()._force_close(exc)