]> granicus.if.org Git - python/commitdiff
Issue #27906: Fix socket accept exhaustion during high TCP traffic.
authorYury Selivanov <yury@magic.io>
Thu, 15 Sep 2016 18:13:15 +0000 (14:13 -0400)
committerYury Selivanov <yury@magic.io>
Thu, 15 Sep 2016 18:13:15 +0000 (14:13 -0400)
Patch by Kevin Conway.

Lib/asyncio/base_events.py
Lib/asyncio/proactor_events.py
Lib/asyncio/selector_events.py
Lib/test/test_asyncio/test_base_events.py
Lib/test/test_asyncio/test_selector_events.py
Misc/NEWS

index bc3e01293989121a754b54d047a986e625c2fb75..8d926dc901edd645fe3fb4a43556fdcdff119a46 100644 (file)
@@ -1034,7 +1034,7 @@ class BaseEventLoop(events.AbstractEventLoop):
         for sock in sockets:
             sock.listen(backlog)
             sock.setblocking(False)
-            self._start_serving(protocol_factory, sock, ssl, server)
+            self._start_serving(protocol_factory, sock, ssl, server, backlog)
         if self._debug:
             logger.info("%r is serving", server)
         return server
index 97ab487f974c3fcb4698a27b64b2249ffb850aa3..fef3205877f9944cf2c6fe6f462ca040d44005ef 100644 (file)
@@ -494,7 +494,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
         self._csock.send(b'\0')
 
     def _start_serving(self, protocol_factory, sock,
-                       sslcontext=None, server=None):
+                       sslcontext=None, server=None, backlog=100):
 
         def loop(f=None):
             try:
index c91ab04f3c08f61b7295abcb7f5c3dcc7429d15d..c18885ebf223c76f9bc01f0e4d555168b5e6aae5 100644 (file)
@@ -162,43 +162,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
                                  exc_info=True)
 
     def _start_serving(self, protocol_factory, sock,
-                       sslcontext=None, server=None):
+                       sslcontext=None, server=None, backlog=100):
         self.add_reader(sock.fileno(), self._accept_connection,
-                        protocol_factory, sock, sslcontext, server)
+                        protocol_factory, sock, sslcontext, server, backlog)
 
     def _accept_connection(self, protocol_factory, sock,
-                           sslcontext=None, server=None):
-        try:
-            conn, addr = sock.accept()
-            if self._debug:
-                logger.debug("%r got a new connection from %r: %r",
-                             server, addr, conn)
-            conn.setblocking(False)
-        except (BlockingIOError, InterruptedError, ConnectionAbortedError):
-            pass  # False alarm.
-        except OSError as exc:
-            # There's nowhere to send the error, so just log it.
-            if exc.errno in (errno.EMFILE, errno.ENFILE,
-                             errno.ENOBUFS, errno.ENOMEM):
-                # Some platforms (e.g. Linux keep reporting the FD as
-                # ready, so we remove the read handler temporarily.
-                # We'll try again in a while.
-                self.call_exception_handler({
-                    'message': 'socket.accept() out of system resource',
-                    'exception': exc,
-                    'socket': sock,
-                })
-                self.remove_reader(sock.fileno())
-                self.call_later(constants.ACCEPT_RETRY_DELAY,
-                                self._start_serving,
-                                protocol_factory, sock, sslcontext, server)
+                           sslcontext=None, server=None, backlog=100):
+        # This method is only called once for each event loop tick where the
+        # listening socket has triggered an EVENT_READ. There may be multiple
+        # connections waiting for an .accept() so it is called in a loop.
+        # See https://bugs.python.org/issue27906 for more details.
+        for _ in range(backlog):
+            try:
+                conn, addr = sock.accept()
+                if self._debug:
+                    logger.debug("%r got a new connection from %r: %r",
+                                 server, addr, conn)
+                conn.setblocking(False)
+            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
+                # Early exit because the socket accept buffer is empty.
+                return None
+            except OSError as exc:
+                # There's nowhere to send the error, so just log it.
+                if exc.errno in (errno.EMFILE, errno.ENFILE,
+                                 errno.ENOBUFS, errno.ENOMEM):
+                    # Some platforms (e.g. Linux keep reporting the FD as
+                    # ready, so we remove the read handler temporarily.
+                    # We'll try again in a while.
+                    self.call_exception_handler({
+                        'message': 'socket.accept() out of system resource',
+                        'exception': exc,
+                        'socket': sock,
+                    })
+                    self.remove_reader(sock.fileno())
+                    self.call_later(constants.ACCEPT_RETRY_DELAY,
+                                    self._start_serving,
+                                    protocol_factory, sock, sslcontext, server,
+                                    backlog)
+                else:
+                    raise  # The event loop will catch, log and ignore it.
             else:
-                raise  # The event loop will catch, log and ignore it.
-        else:
-            extra = {'peername': addr}
-            accept = self._accept_connection2(protocol_factory, conn, extra,
-                                              sslcontext, server)
-            self.create_task(accept)
+                extra = {'peername': addr}
+                accept = self._accept_connection2(protocol_factory, conn, extra,
+                                                  sslcontext, server)
+                self.create_task(accept)
 
     @coroutine
     def _accept_connection2(self, protocol_factory, conn, extra,
index 206ebc69fe4476a81a9bde84c517afde45f5d9e3..0efdc202df135ae438513c8a61d74c7c87b28f42 100644 (file)
@@ -1634,7 +1634,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
         self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
                                                 # self.loop._start_serving
                                                 mock.ANY,
-                                                MyProto, sock, None, None)
+                                                MyProto, sock, None, None, mock.ANY)
 
     def test_call_coroutine(self):
         @asyncio.coroutine
index ff71c218bb1659909aa1fc2b928ddb2c033a508c..73bc3f3281f46bd311e361fd06accb38d2b04238 100644 (file)
@@ -687,6 +687,20 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
               selectors.EVENT_WRITE)])
         self.loop.remove_writer.assert_called_with(1)
 
+    def test_accept_connection_multiple(self):
+        sock = mock.Mock()
+        sock.accept.return_value = (mock.Mock(), mock.Mock())
+        backlog = 100
+        # Mock the coroutine generation for a connection to prevent
+        # warnings related to un-awaited coroutines.
+        mock_obj = mock.patch.object
+        with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
+            accept2_mock.return_value = None
+            with mock_obj(self.loop, 'create_task') as task_mock:
+                task_mock.return_value = None
+                self.loop._accept_connection(mock.Mock(), sock, backlog=backlog)
+        self.assertEqual(sock.accept.call_count, backlog)
+
 
 class SelectorTransportTests(test_utils.TestCase):
 
index 4bd55c3b4c81f1afd4859a3ee3b6e54bc50099be..0ebf5408449a3c114f82272eac103abca601a53f 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -263,6 +263,9 @@ Library
 
 - Issue #27456: asyncio: Set TCP_NODELAY by default.
 
+- Issue #27906: Fix socket accept exhaustion during high TCP traffic.
+  Patch by Kevin Conway.
+
 IDLE
 ----