import asyncio
async def tcp_echo_client(message):
- reader, writer = await asyncio.open_connection(
- '127.0.0.1', 8888)
+ async with asyncio.connect('127.0.0.1', 8888) as stream:
+ print(f'Send: {message!r}')
+ await stream.write(message.encode())
- print(f'Send: {message!r}')
- await writer.write(message.encode())
-
- data = await reader.read(100)
- print(f'Received: {data.decode()!r}')
-
- print('Close the connection')
- await writer.close()
+ data = await stream.read(100)
+ print(f'Received: {data.decode()!r}')
asyncio.run(tcp_echo_client('Hello World!'))
and work with streams:
+.. coroutinefunction:: connect(host=None, port=None, \*, \
+ limit=2**16, ssl=None, family=0, \
+ proto=0, flags=0, sock=None, local_addr=None, \
+ server_hostname=None, ssl_handshake_timeout=None, \
+ happy_eyeballs_delay=None, interleave=None)
+
+ Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
+ object of mode :attr:`StreamMode.READWRITE`.
+
+
+ *limit* determines the buffer size limit used by the returned :class:`Stream`
+ instance. By default the *limit* is set to 64 KiB.
+
+ The rest of the arguments are passed directly to :meth:`loop.create_connection`.
+
+ The function can be used with ``await`` to get a connected stream::
+
+ stream = await asyncio.connect('127.0.0.1', 8888)
+
+ The function can also be used as an async context manager::
+
+ async with asyncio.connect('127.0.0.1', 8888) as stream:
+ ...
+
+ .. versionadded:: 3.8
+
.. coroutinefunction:: open_connection(host=None, port=None, \*, \
loop=None, limit=None, ssl=None, family=0, \
proto=0, flags=0, sock=None, local_addr=None, \
.. deprecated-removed:: 3.8 3.10
- `open_connection()` is deprecated in favor of `connect()`.
+ `open_connection()` is deprecated in favor of :func:`connect`.
.. coroutinefunction:: start_server(client_connected_cb, host=None, \
- port=None, \*, loop=None, limit=None, \
+ port=None, \*, loop=None, limit=2**16, \
family=socket.AF_UNSPEC, \
flags=socket.AI_PASSIVE, sock=None, \
backlog=100, ssl=None, reuse_address=None, \
.. deprecated-removed:: 3.8 3.10
- `start_server()` is deprecated if favor of `StreamServer()`
+ `start_server()` is deprecated if favor of :class:`StreamServer`
+
+.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
+
+ Takes a :term:`file-like object <file object>` *pipe* to return a
+ :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
+ similar API of :class:`StreamReader`. It can also be used as an async context manager.
+
+ *limit* determines the buffer size limit used by the returned :class:`Stream`
+ instance. By default the limit is set to 64 KiB.
+ .. versionadded:: 3.8
+
+.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
+
+ Takes a :term:`file-like object <file object>` *pipe* to return a
+ :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
+ similar API of :class:`StreamWriter`. It can also be used as an async context manager.
+
+ *limit* determines the buffer size limit used by the returned :class:`Stream`
+ instance. By default the limit is set to 64 KiB.
+
+ .. versionadded:: 3.8
.. rubric:: Unix Sockets
+.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
+ sock=None, server_hostname=None, \
+ ssl_handshake_timeout=None)
+
+ Establish a Unix socket connection to socket with *path* address and
+ return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
+ that can be used as a reader and a writer.
+
+ *limit* determines the buffer size limit used by the returned :class:`Stream`
+ instance. By default the *limit* is set to 64 KiB.
+
+ The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
+
+ The function can be used with ``await`` to get a connected stream::
+
+ stream = await asyncio.connect_unix('/tmp/example.sock')
+
+ The function can also be used as an async context manager::
+
+ async with asyncio.connect_unix('/tmp/example.sock') as stream:
+ ...
+
+ .. availability:: Unix.
+
+ .. versionadded:: 3.8
+
.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
limit=None, ssl=None, sock=None, \
server_hostname=None, ssl_handshake_timeout=None)
.. deprecated-removed:: 3.8 3.10
- `open_unix_connection()` is deprecated if favor of `connect_unix()`.
+ ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
.. deprecated-removed:: 3.8 3.10
- `start_unix_server()` is deprecated in favor of `UnixStreamServer()`.
+ ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
---------
+StreamServer
+============
+
+.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
+ limit=2**16, family=socket.AF_UNSPEC, \
+ flags=socket.AI_PASSIVE, sock=None, backlog=100, \
+ ssl=None, reuse_address=None, reuse_port=None, \
+ ssl_handshake_timeout=None, shutdown_timeout=60)
+
+ The *client_connected_cb* callback is called whenever a new client
+ connection is established. It receives a :class:`Stream` object of the
+ mode :attr:`StreamMode.READWRITE`.
+
+ *client_connected_cb* can be a plain callable or a
+ :ref:`coroutine function <coroutine>`; if it is a coroutine function,
+ it will be automatically scheduled as a :class:`Task`.
+
+ *limit* determines the buffer size limit used by the
+ returned :class:`Stream` instance. By default the *limit*
+ is set to 64 KiB.
+
+ The rest of the arguments are passed directly to
+ :meth:`loop.create_server`.
+
+ .. coroutinemethod:: start_serving()
+
+ Binds to the given host and port to start the server.
+
+ .. coroutinemethod:: serve_forever()
+
+ Start accepting connections until the coroutine is cancelled.
+ Cancellation of ``serve_forever`` task causes the server
+ to be closed.
+
+ This method can be called if the server is already accepting
+ connections. Only one ``serve_forever`` task can exist per
+ one *Server* object.
+
+ .. method:: is_serving()
+
+ Returns ``True`` if the server is bound and currently serving.
+
+ .. method:: bind()
+
+ Bind the server to the given *host* and *port*. This method is
+ automatically called during ``__aenter__`` when :class:`StreamServer` is
+ used as an async context manager.
+
+ .. method:: is_bound()
+
+ Return ``True`` if the server is bound.
+
+ .. coroutinemethod:: abort()
+
+ Closes the connection and cancels all pending tasks.
+
+ .. coroutinemethod:: close()
+
+ Closes the connection. This method is automatically called during
+ ``__aexit__`` when :class:`StreamServer` is used as an async context
+ manager.
+
+ .. attribute:: sockets
+
+ Returns a tuple of socket objects the server is bound to.
+
+ .. versionadded:: 3.8
+
+
+UnixStreamServer
+================
+
+.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
+ limit=2**16, sock=None, backlog=100, \
+ ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
+
+ The *client_connected_cb* callback is called whenever a new client
+ connection is established. It receives a :class:`Stream` object of the
+ mode :attr:`StreamMode.READWRITE`.
+
+ *client_connected_cb* can be a plain callable or a
+ :ref:`coroutine function <coroutine>`; if it is a coroutine function,
+ it will be automatically scheduled as a :class:`Task`.
+
+ *limit* determines the buffer size limit used by the
+ returned :class:`Stream` instance. By default the *limit*
+ is set to 64 KiB.
+
+ The rest of the arguments are passed directly to
+ :meth:`loop.create_unix_server`.
+
+ .. coroutinemethod:: start_serving()
+
+ Binds to the given host and port to start the server.
+
+ .. method:: is_serving()
+
+ Returns ``True`` if the server is bound and currently serving.
+
+ .. method:: bind()
+
+ Bind the server to the given *host* and *port*. This method is
+ automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
+ used as an async context manager.
+
+ .. method:: is_bound()
+
+ Return ``True`` if the server is bound.
+
+ .. coroutinemethod:: abort()
+
+ Closes the connection and cancels all pending tasks.
+
+ .. coroutinemethod:: close()
+
+ Closes the connection. This method is automatically called during
+ ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
+ manager.
+
+ .. attribute:: sockets
+
+ Returns a tuple of socket objects the server is bound to.
+
+ .. availability:: Unix.
+
+ .. versionadded:: 3.8
+
+Stream
+======
+
+.. class:: Stream
+
+ Represents a Stream object that provides APIs to read and write data
+ to the IO stream . It includes the API provided by :class:`StreamReader`
+ and :class:`StreamWriter`.
+
+ Do not instantiate *Stream* objects directly; use API like :func:`connect`
+ and :class:`StreamServer` instead.
+
+ .. versionadded:: 3.8
+
+
+StreamMode
+==========
+
+.. class:: StreamMode
+
+ A subclass of :class:`enum.Flag` that defines a set of values that can be
+ used to determine the ``mode`` of :class:`Stream` objects.
+
+ .. data:: READ
+
+ The stream object is readable and provides the API of :class:`StreamReader`.
+
+ .. data:: WRITE
+
+ The stream object is writeable and provides the API of :class:`StreamWriter`.
+
+ .. data:: READWRITE
+
+ The stream object is readable and writeable and provides the API of both
+ :class:`StreamReader` and :class:`StreamWriter`.
+
+ .. versionadded:: 3.8
+
StreamReader
============
TCP echo client using streams
-----------------------------
-TCP echo client using the :func:`asyncio.open_connection` function::
+TCP echo client using the :func:`asyncio.connect` function::
import asyncio
async def tcp_echo_client(message):
- reader, writer = await asyncio.open_connection(
- '127.0.0.1', 8888)
-
- print(f'Send: {message!r}')
- writer.write(message.encode())
+ async with asyncio.connect('127.0.0.1', 8888) as stream:
+ print(f'Send: {message!r}')
+ await stream.write(message.encode())
- data = await reader.read(100)
- print(f'Received: {data.decode()!r}')
-
- print('Close the connection')
- writer.close()
+ data = await stream.read(100)
+ print(f'Received: {data.decode()!r}')
asyncio.run(tcp_echo_client('Hello World!'))
TCP echo server using streams
-----------------------------
-TCP echo server using the :func:`asyncio.start_server` function::
+TCP echo server using the :class:`asyncio.StreamServer` class::
import asyncio
- async def handle_echo(reader, writer):
- data = await reader.read(100)
+ async def handle_echo(stream):
+ data = await stream.read(100)
message = data.decode()
- addr = writer.get_extra_info('peername')
+ addr = stream.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
- writer.write(data)
- await writer.drain()
+ await stream.write(data)
print("Close the connection")
- writer.close()
+ await stream.close()
async def main():
- server = await asyncio.start_server(
- handle_echo, '127.0.0.1', 8888)
-
- addr = server.sockets[0].getsockname()
- print(f'Serving on {addr}')
-
- async with server:
+ async with asyncio.StreamServer(
+ handle_echo, '127.0.0.1', 8888) as server:
+ addr = server.sockets[0].getsockname()
+ print(f'Serving on {addr}')
await server.serve_forever()
asyncio.run(main())
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
- reader, writer = await asyncio.open_connection(
- url.hostname, 443, ssl=True)
+ stream = await asyncio.connect(url.hostname, 443, ssl=True)
else:
- reader, writer = await asyncio.open_connection(
- url.hostname, 80)
+ stream = await asyncio.connect(url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"\r\n"
)
- writer.write(query.encode('latin-1'))
- while True:
- line = await reader.readline()
- if not line:
- break
-
+ stream.write(query.encode('latin-1'))
+ while (line := await stream.readline()):
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
- writer.close()
+ await stream.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
------------------------------------------------------
Coroutine waiting until a socket receives data using the
-:func:`open_connection` function::
+:func:`asyncio.connect` function::
import asyncio
import socket
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
- reader, writer = await asyncio.open_connection(sock=rsock)
-
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
+ async with asyncio.connect(sock=rsock) as stream:
+ # Simulate the reception of data from the network
+ loop.call_soon(wsock.send, 'abc'.encode())
- # Wait for data
- data = await reader.read(100)
+ # Wait for data
+ data = await stream.read(100)
- # Got data, we are done: close the socket
- print("Received:", data.decode())
- writer.close()
+ # Got data, we are done: close the socket
+ print("Received:", data.decode())
# Close the second socket
wsock.close()