Creating listening connections
------------------------------
-.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None)
+.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
*host* and *port*.
for the SSL handshake to complete before aborting the connection.
``10.0`` seconds if ``None`` (default).
+ * *start_serving* set to ``True`` (the default) causes the created server
+ to start accepting connections immediately. When set to ``False``,
+ the user should await on :meth:`Server.start_serving` or
+ :meth:`Server.serve_forever` to make the server to start accepting
+ connections.
+
.. versionadded:: 3.7
- The *ssl_handshake_timeout* parameter.
+ *ssl_handshake_timeout* and *start_serving* parameters.
.. versionchanged:: 3.5
The *host* parameter can now be a sequence of strings.
-.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None)
+.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
Similar to :meth:`AbstractEventLoop.create_server`, but specific to the
socket family :py:data:`~socket.AF_UNIX`.
Server listening on sockets.
- Object created by the :meth:`AbstractEventLoop.create_server` method and the
- :func:`start_server` function. Don't instantiate the class directly.
+ Object created by :meth:`AbstractEventLoop.create_server`,
+ :meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`,
+ and :func:`start_unix_server` functions. Don't instantiate the class
+ directly.
+
+ *Server* objects are asynchronous context managers. When used in an
+ ``async with`` statement, it's guaranteed that the Server object is
+ closed and not accepting new connections when the ``async with``
+ statement is completed::
+
+ srv = await loop.create_server(...)
+
+ async with srv:
+ # some code
+
+ # At this point, srv is closed and no longer accepts new connections.
+
+
+ .. versionchanged:: 3.7
+ Server object is an asynchronous context manager since Python 3.7.
.. method:: close()
.. versionadded:: 3.7
+ .. coroutinemethod:: start_serving()
+
+ Start accepting connections.
+
+ This method is idempotent, so it can be called when
+ the server is already being serving.
+
+ The new *start_serving* keyword-only parameter to
+ :meth:`AbstractEventLoop.create_server` and
+ :meth:`asyncio.start_server` allows to create a Server object
+ that is not accepting connections right away. In which case
+ this method, or :meth:`Server.serve_forever` can be used
+ to make the Server object to start accepting connections.
+
+ .. versionadded:: 3.7
+
+ .. coroutinemethod:: serve_forever()
+
+ Start accepting connections until the coroutine is cancelled.
+ Cancellation of ``serve_forever`` task causes the server
+ to be closed.
+
+ This method can be called if the server is already accepting
+ connections. Only one ``serve_forever`` task can exist per
+ one *Server* object.
+
+ Example::
+
+ async def client_connected(reader, writer):
+ # Communicate with the client with
+ # reader/writer streams. For example:
+ await reader.readline()
+
+ async def main(host, port):
+ srv = await asyncio.start_server(
+ client_connected, host, port)
+ await loop.serve_forever()
+
+ asyncio.run(main('127.0.0.1', 0))
+
+ .. versionadded:: 3.7
+
+ .. method:: is_serving()
+
+ Return ``True`` if the server is accepting new connections.
+
+ .. versionadded:: 3.7
+
.. coroutinemethod:: wait_closed()
Wait until the :meth:`close` method completes.
List of :class:`socket.socket` objects the server is listening to, or
``None`` if the server is closed.
+ .. versionchanged:: 3.7
+ Prior to Python 3.7 ``Server.sockets`` used to return the
+ internal list of server's sockets directly. In 3.7 a copy
+ of that list is returned.
+
Handle
------
class Server(events.AbstractServer):
- def __init__(self, loop, sockets):
+ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
+ ssl_handshake_timeout):
self._loop = loop
- self.sockets = sockets
+ self._sockets = sockets
self._active_count = 0
self._waiters = []
+ self._protocol_factory = protocol_factory
+ self._backlog = backlog
+ self._ssl_context = ssl_context
+ self._ssl_handshake_timeout = ssl_handshake_timeout
+ self._serving = False
+ self._serving_forever_fut = None
def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
def _attach(self):
- assert self.sockets is not None
+ assert self._sockets is not None
self._active_count += 1
def _detach(self):
assert self._active_count > 0
self._active_count -= 1
- if self._active_count == 0 and self.sockets is None:
+ if self._active_count == 0 and self._sockets is None:
self._wakeup()
+ def _wakeup(self):
+ waiters = self._waiters
+ self._waiters = None
+ for waiter in waiters:
+ if not waiter.done():
+ waiter.set_result(waiter)
+
+ def _start_serving(self):
+ if self._serving:
+ return
+ self._serving = True
+ for sock in self._sockets:
+ sock.listen(self._backlog)
+ self._loop._start_serving(
+ self._protocol_factory, sock, self._ssl_context,
+ self, self._backlog, self._ssl_handshake_timeout)
+
+ def get_loop(self):
+ return self._loop
+
+ def is_serving(self):
+ return self._serving
+
+ @property
+ def sockets(self):
+ if self._sockets is None:
+ return []
+ return list(self._sockets)
+
def close(self):
- sockets = self.sockets
+ sockets = self._sockets
if sockets is None:
return
- self.sockets = None
+ self._sockets = None
+
for sock in sockets:
self._loop._stop_serving(sock)
+
+ self._serving = False
+
+ if (self._serving_forever_fut is not None and
+ not self._serving_forever_fut.done()):
+ self._serving_forever_fut.cancel()
+ self._serving_forever_fut = None
+
if self._active_count == 0:
self._wakeup()
- def get_loop(self):
- return self._loop
+ async def start_serving(self):
+ self._start_serving()
- def _wakeup(self):
- waiters = self._waiters
- self._waiters = None
- for waiter in waiters:
- if not waiter.done():
- waiter.set_result(waiter)
+ async def serve_forever(self):
+ if self._serving_forever_fut is not None:
+ raise RuntimeError(
+ f'server {self!r} is already being awaited on serve_forever()')
+ if self._sockets is None:
+ raise RuntimeError(f'server {self!r} is closed')
+
+ self._start_serving()
+ self._serving_forever_fut = self._loop.create_future()
+
+ try:
+ await self._serving_forever_fut
+ except futures.CancelledError:
+ try:
+ self.close()
+ await self.wait_closed()
+ finally:
+ raise
+ finally:
+ self._serving_forever_fut = None
async def wait_closed(self):
- if self.sockets is None or self._waiters is None:
+ if self._sockets is None or self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
ssl=None,
reuse_address=None,
reuse_port=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
"""Create a TCP server.
The host parameter can be a string, in that case the TCP server is
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
sockets = [sock]
- server = Server(self, sockets)
for sock in sockets:
- sock.listen(backlog)
sock.setblocking(False)
- self._start_serving(protocol_factory, sock, ssl, server, backlog,
- ssl_handshake_timeout)
+
+ server = Server(self, sockets, protocol_factory,
+ ssl, backlog, ssl_handshake_timeout)
+ if start_serving:
+ server._start_serving()
+
if self._debug:
logger.info("%r is serving", server)
return server
"""Stop serving. This leaves existing connections open."""
raise NotImplementedError
+ def get_loop(self):
+ """Get the event loop the Server object is attached to."""
+ raise NotImplementedError
+
+ def is_serving(self):
+ """Return True if the server is accepting connections."""
+ raise NotImplementedError
+
+ async def start_serving(self):
+ """Start accepting connections.
+
+ This method is idempotent, so it can be called when
+ the server is already being serving.
+ """
+ raise NotImplementedError
+
+ async def serve_forever(self):
+ """Start accepting connections until the coroutine is cancelled.
+
+ The server is closed when the coroutine is cancelled.
+ """
+ raise NotImplementedError
+
async def wait_closed(self):
"""Coroutine to wait until service is closed."""
raise NotImplementedError
- def get_loop(self):
- """ Get the event loop the Server object is attached to."""
- raise NotImplementedError
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, *exc):
+ self.close()
+ await self.wait_closed()
class AbstractEventLoop:
*, family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE, sock=None, backlog=100,
ssl=None, reuse_address=None, reuse_port=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
"""A coroutine which creates a TCP server bound to host and port.
The return value is a Server object which can be used to stop
will wait for completion of the SSL handshake before aborting the
connection. Default is 10s, longer timeouts may increase vulnerability
to DoS attacks (see https://support.f5.com/csp/article/K13834)
+
+ start_serving set to True (default) causes the created server
+ to start accepting connections immediately. When set to False,
+ the user should await Server.start_serving() or Server.serve_forever()
+ to make the server to start accepting connections.
"""
raise NotImplementedError
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
"""A coroutine which creates a UNIX Domain Socket server.
The return value is a Server object, which can be used to stop
ssl_handshake_timeout is the time in seconds that an SSL server
will wait for the SSL handshake to complete (defaults to 10s).
+
+ start_serving set to True (default) causes the created server
+ to start accepting connections immediately. When set to False,
+ the user should await Server.start_serving() or Server.serve_forever()
+ to make the server to start accepting connections.
"""
raise NotImplementedError
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
if isinstance(ssl, bool):
raise TypeError('ssl argument must be an SSLContext or None')
raise ValueError(
f'A UNIX Domain Stream Socket was expected, got {sock!r}')
- server = base_events.Server(self, [sock])
- sock.listen(backlog)
sock.setblocking(False)
- self._start_serving(protocol_factory, sock, ssl, server,
- ssl_handshake_timeout=ssl_handshake_timeout)
+ server = base_events.Server(self, [sock], protocol_factory,
+ ssl, backlog, ssl_handshake_timeout)
+ if start_serving:
+ server._start_serving()
+
return server
async def _sock_sendfile_native(self, sock, file, offset, count):
--- /dev/null
+import asyncio
+import socket
+import threading
+import unittest
+
+from test.test_asyncio import utils as test_utils
+from test.test_asyncio import functional as func_tests
+
+
+class BaseStartServer(func_tests.FunctionalTestCaseMixin):
+
+ def new_loop(self):
+ raise NotImplementedError
+
+ def test_start_server_1(self):
+ HELLO_MSG = b'1' * 1024 * 5 + b'\n'
+
+ def client(sock, addr):
+ sock.connect(addr)
+ sock.send(HELLO_MSG)
+ sock.recv_all(1)
+ sock.close()
+
+ async def serve(reader, writer):
+ await reader.readline()
+ main_task.cancel()
+ writer.write(b'1')
+ writer.close()
+ await writer.wait_closed()
+
+ async def main(srv):
+ async with srv:
+ await srv.serve_forever()
+
+ srv = self.loop.run_until_complete(asyncio.start_server(
+ serve, '127.0.0.1', 0, loop=self.loop, start_serving=False))
+
+ self.assertFalse(srv.is_serving())
+
+ main_task = self.loop.create_task(main(srv))
+
+ addr = srv.sockets[0].getsockname()
+ with self.assertRaises(asyncio.CancelledError):
+ with self.tcp_client(lambda sock: client(sock, addr)):
+ self.loop.run_until_complete(main_task)
+
+ self.assertEqual(srv.sockets, [])
+
+ self.assertIsNone(srv._sockets)
+ self.assertIsNone(srv._waiters)
+ self.assertFalse(srv.is_serving())
+
+ with self.assertRaisesRegex(RuntimeError, r'is closed'):
+ self.loop.run_until_complete(srv.serve_forever())
+
+
+class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
+
+ def new_loop(self):
+ return asyncio.SelectorEventLoop()
+
+ @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'no Unix sockets')
+ def test_start_unix_server_1(self):
+ HELLO_MSG = b'1' * 1024 * 5 + b'\n'
+ started = threading.Event()
+
+ def client(sock, addr):
+ started.wait(5)
+ sock.connect(addr)
+ sock.send(HELLO_MSG)
+ sock.recv_all(1)
+ sock.close()
+
+ async def serve(reader, writer):
+ await reader.readline()
+ main_task.cancel()
+ writer.write(b'1')
+ writer.close()
+ await writer.wait_closed()
+
+ async def main(srv):
+ async with srv:
+ self.assertFalse(srv.is_serving())
+ await srv.start_serving()
+ self.assertTrue(srv.is_serving())
+ started.set()
+ await srv.serve_forever()
+
+ with test_utils.unix_socket_path() as addr:
+ srv = self.loop.run_until_complete(asyncio.start_unix_server(
+ serve, addr, loop=self.loop, start_serving=False))
+
+ main_task = self.loop.create_task(main(srv))
+
+ with self.assertRaises(asyncio.CancelledError):
+ with self.unix_client(lambda sock: client(sock, addr)):
+ self.loop.run_until_complete(main_task)
+
+ self.assertEqual(srv.sockets, [])
+
+ self.assertIsNone(srv._sockets)
+ self.assertIsNone(srv._waiters)
+ self.assertFalse(srv.is_serving())
+
+ with self.assertRaisesRegex(RuntimeError, r'is closed'):
+ self.loop.run_until_complete(srv.serve_forever())
+
+
+@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
+class ProactorStartServerTests(BaseStartServer, unittest.TestCase):
+
+ def new_loop(self):
+ return asyncio.ProactorEventLoop()
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+Implement Server.start_serving(), Server.serve_forever(), and
+Server.is_serving() methods. Add 'start_serving' keyword parameter to
+loop.create_server() and loop.create_unix_server().