From 525f40d231aba2c004619fc7a5207171ed65b0cb Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 19 Oct 2017 21:46:40 +0200 Subject: [PATCH] bpo-31819: Add AbstractEventLoop.sock_recv_into() (#4051) * bpo-31819: Add AbstractEventLoop.sock_recv_into() * Add NEWS * Add doc --- Doc/library/asyncio-eventloop.rst | 15 ++ Lib/asyncio/events.py | 3 + Lib/asyncio/proactor_events.py | 3 + Lib/asyncio/selector_events.py | 35 +++ Lib/asyncio/windows_events.py | 22 ++ Lib/test/test_asyncio/test_events.py | 26 +++ Lib/test/test_asyncio/test_proactor_events.py | 5 + .../2017-10-19-20-03-13.bpo-31819.mw2wF9.rst | 1 + Modules/overlapped.c | 214 +++++++++++++----- 9 files changed, 266 insertions(+), 58 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 83bbb70b03..ade37390c6 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -554,6 +554,21 @@ Low-level socket operations This method is a :ref:`coroutine `. +.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf) + + Receive data from the socket. Modeled after blocking + :meth:`socket.socket.recv_into` method. + + The received data is written into *buf* (a writable buffer). + The return value is the number of bytes written. + + With :class:`SelectorEventLoop` event loop, the socket *sock* must be + non-blocking. + + This method is a :ref:`coroutine `. + + .. versionadded:: 3.7 + .. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data) Send data to the socket. Modeled after blocking diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 03af6994e9..0dbd92cf3a 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -461,6 +461,9 @@ class AbstractEventLoop: def sock_recv(self, sock, nbytes): raise NotImplementedError + def sock_recv_into(self, sock, buf): + raise NotImplementedError + def sock_sendall(self, sock, data): raise NotImplementedError diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 642f61e356..5e7a39727c 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -439,6 +439,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): def sock_recv(self, sock, n): return self._proactor.recv(sock, n) + def sock_recv_into(self, sock, buf): + return self._proactor.recv_into(sock, buf) + def sock_sendall(self, sock, data): return self._proactor.send(sock, data) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 4b403560c3..7143ca2660 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -386,6 +386,41 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: fut.set_result(data) + def sock_recv_into(self, sock, buf): + """Receive data from the socket. + + The received data is written into *buf* (a writable buffer). + The return value is the number of bytes written. + + This method is a coroutine. + """ + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + fut = self.create_future() + self._sock_recv_into(fut, False, sock, buf) + return fut + + def _sock_recv_into(self, fut, registered, sock, buf): + # _sock_recv_into() can add itself as an I/O callback if the operation + # can't be done immediately. Don't use it directly, call sock_recv_into(). + fd = sock.fileno() + if registered: + # Remove the callback early. It should be rare that the + # selector says the fd is ready but the call still returns + # EAGAIN, and I am willing to take a hit in that case in + # order to simplify the common case. + self.remove_reader(fd) + if fut.cancelled(): + return + try: + nbytes = sock.recv_into(buf) + except (BlockingIOError, InterruptedError): + self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf) + except Exception as exc: + fut.set_exception(exc) + else: + fut.set_result(nbytes) + def sock_sendall(self, sock, data): """Send data to the socket. diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index b777dd065a..6045ba029e 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -448,6 +448,28 @@ class IocpProactor: return self._register(ov, conn, finish_recv) + def recv_into(self, conn, buf, flags=0): + self._register_with_iocp(conn) + ov = _overlapped.Overlapped(NULL) + try: + if isinstance(conn, socket.socket): + ov.WSARecvInto(conn.fileno(), buf, flags) + else: + ov.ReadFileInto(conn.fileno(), buf) + except BrokenPipeError: + return self._result(b'') + + def finish_recv(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: + raise ConnectionResetError(*exc.args) + else: + raise + + return self._register(ov, conn, finish_recv) + def send(self, conn, buf, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 736f703c2f..0ea9c08674 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -425,6 +425,9 @@ class EventLoopTestsMixin: with self.assertRaises(ValueError): self.loop.run_until_complete( self.loop.sock_recv(sock, 1024)) + with self.assertRaises(ValueError): + self.loop.run_until_complete( + self.loop.sock_recv_into(sock, bytearray())) with self.assertRaises(ValueError): self.loop.run_until_complete( self.loop.sock_accept(sock)) @@ -443,16 +446,37 @@ class EventLoopTestsMixin: sock.close() self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) + def _basetest_sock_recv_into(self, httpd, sock): + # same as _basetest_sock_client_ops, but using sock_recv_into + sock.setblocking(False) + self.loop.run_until_complete( + self.loop.sock_connect(sock, httpd.address)) + self.loop.run_until_complete( + self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) + data = bytearray(1024) + with memoryview(data) as buf: + nbytes = self.loop.run_until_complete( + self.loop.sock_recv_into(sock, buf[:1024])) + # consume data + self.loop.run_until_complete( + self.loop.sock_recv_into(sock, buf[nbytes:])) + sock.close() + self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) + def test_sock_client_ops(self): with test_utils.run_test_server() as httpd: sock = socket.socket() self._basetest_sock_client_ops(httpd, sock) + sock = socket.socket() + self._basetest_sock_recv_into(httpd, sock) @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') def test_unix_sock_client_ops(self): with test_utils.run_test_unix_server() as httpd: sock = socket.socket(socket.AF_UNIX) self._basetest_sock_client_ops(httpd, sock) + sock = socket.socket(socket.AF_UNIX) + self._basetest_sock_recv_into(httpd, sock) def test_sock_client_fail(self): # Make sure that we will get an unused port @@ -2612,6 +2636,8 @@ class AbstractEventLoopTests(unittest.TestCase): NotImplementedError, loop.remove_writer, 1) self.assertRaises( NotImplementedError, loop.sock_recv, f, 10) + self.assertRaises( + NotImplementedError, loop.sock_recv_into, f, 10) self.assertRaises( NotImplementedError, loop.sock_sendall, f, 10) self.assertRaises( diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index d76da661ab..7a8b52352d 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -489,6 +489,11 @@ class BaseProactorEventLoopTests(test_utils.TestCase): self.loop.sock_recv(self.sock, 1024) self.proactor.recv.assert_called_with(self.sock, 1024) + def test_sock_recv_into(self): + buf = bytearray(10) + self.loop.sock_recv_into(self.sock, buf) + self.proactor.recv_into.assert_called_with(self.sock, buf) + def test_sock_sendall(self): self.loop.sock_sendall(self.sock, b'data') self.proactor.send.assert_called_with(self.sock, b'data') diff --git a/Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst b/Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst new file mode 100644 index 0000000000..7bdb0d9f88 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst @@ -0,0 +1 @@ +Add AbstractEventLoop.sock_recv_into(). diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 6099d46643..9355ce60f4 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -37,8 +37,8 @@ #define T_HANDLE T_POINTER -enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_WRITE, TYPE_ACCEPT, - TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE, +enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE, + TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE, TYPE_WAIT_NAMED_PIPE_AND_CONNECT}; typedef struct { @@ -51,10 +51,10 @@ typedef struct { /* Type of operation */ DWORD type; union { - /* Buffer used for reading: TYPE_READ and TYPE_ACCEPT */ - PyObject *read_buffer; - /* Buffer used for writing: TYPE_WRITE */ - Py_buffer write_buffer; + /* Buffer allocated by us: TYPE_READ and TYPE_ACCEPT */ + PyObject *allocated_buffer; + /* Buffer passed by the user: TYPE_WRITE and TYPE_READINTO */ + Py_buffer user_buffer; }; } OverlappedObject; @@ -550,9 +550,9 @@ Overlapped_new(PyTypeObject *type, PyObject *args, PyObject *kwds) self->handle = NULL; self->error = 0; self->type = TYPE_NONE; - self->read_buffer = NULL; + self->allocated_buffer = NULL; memset(&self->overlapped, 0, sizeof(OVERLAPPED)); - memset(&self->write_buffer, 0, sizeof(Py_buffer)); + memset(&self->user_buffer, 0, sizeof(Py_buffer)); if (event) self->overlapped.hEvent = event; return (PyObject *)self; @@ -597,11 +597,12 @@ Overlapped_dealloc(OverlappedObject *self) switch (self->type) { case TYPE_READ: case TYPE_ACCEPT: - Py_CLEAR(self->read_buffer); + Py_CLEAR(self->allocated_buffer); break; case TYPE_WRITE: - if (self->write_buffer.obj) - PyBuffer_Release(&self->write_buffer); + case TYPE_READINTO: + if (self->user_buffer.obj) + PyBuffer_Release(&self->user_buffer); break; } PyObject_Del(self); @@ -676,7 +677,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) case ERROR_MORE_DATA: break; case ERROR_BROKEN_PIPE: - if ((self->type == TYPE_READ || self->type == TYPE_ACCEPT) && self->read_buffer != NULL) + if (self->type == TYPE_READ || self->type == TYPE_READINTO) break; /* fall through */ default: @@ -685,17 +686,45 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) switch (self->type) { case TYPE_READ: - assert(PyBytes_CheckExact(self->read_buffer)); - if (transferred != PyBytes_GET_SIZE(self->read_buffer) && - _PyBytes_Resize(&self->read_buffer, transferred)) + assert(PyBytes_CheckExact(self->allocated_buffer)); + if (transferred != PyBytes_GET_SIZE(self->allocated_buffer) && + _PyBytes_Resize(&self->allocated_buffer, transferred)) return NULL; - Py_INCREF(self->read_buffer); - return self->read_buffer; + Py_INCREF(self->allocated_buffer); + return self->allocated_buffer; default: return PyLong_FromUnsignedLong((unsigned long) transferred); } } +static PyObject * +do_ReadFile(OverlappedObject *self, HANDLE handle, + char *bufstart, DWORD buflen) +{ + DWORD nread; + int ret; + DWORD err; + + Py_BEGIN_ALLOW_THREADS + ret = ReadFile(handle, bufstart, buflen, &nread, + &self->overlapped); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : GetLastError(); + switch (err) { + case ERROR_BROKEN_PIPE: + mark_as_completed(&self->overlapped); + return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } +} + PyDoc_STRVAR( Overlapped_ReadFile_doc, "ReadFile(handle, size) -> Overlapped[message]\n\n" @@ -706,10 +735,7 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args) { HANDLE handle; DWORD size; - DWORD nread; PyObject *buf; - BOOL ret; - DWORD err; if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &handle, &size)) return NULL; @@ -728,14 +754,66 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args) self->type = TYPE_READ; self->handle = handle; - self->read_buffer = buf; + self->allocated_buffer = buf; + + return do_ReadFile(self, handle, PyBytes_AS_STRING(buf), size); +} + +PyDoc_STRVAR( + Overlapped_ReadFileInto_doc, + "ReadFileInto(handle, buf) -> Overlapped[bytes_transferred]\n\n" + "Start overlapped receive"); + +static PyObject * +Overlapped_ReadFileInto(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + PyObject *bufobj; + + if (!PyArg_ParseTuple(args, F_HANDLE "O", &handle, &bufobj)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) + return NULL; + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->user_buffer); + PyErr_SetString(PyExc_ValueError, "buffer too large"); + return NULL; + } +#endif + + self->type = TYPE_READINTO; + self->handle = handle; + + return do_ReadFile(self, handle, self->user_buffer.buf, + (DWORD)self->user_buffer.len); +} + +static PyObject * +do_WSARecv(OverlappedObject *self, HANDLE handle, + char *bufstart, DWORD buflen, DWORD flags) +{ + DWORD nread; + WSABUF wsabuf; + int ret; + DWORD err; + + wsabuf.buf = bufstart; + wsabuf.len = buflen; Py_BEGIN_ALLOW_THREADS - ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, - &self->overlapped); + ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags, + &self->overlapped, NULL); Py_END_ALLOW_THREADS - self->error = err = ret ? ERROR_SUCCESS : GetLastError(); + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); switch (err) { case ERROR_BROKEN_PIPE: mark_as_completed(&self->overlapped); @@ -761,11 +839,7 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args) HANDLE handle; DWORD size; DWORD flags = 0; - DWORD nread; PyObject *buf; - WSABUF wsabuf; - int ret; - DWORD err; if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD, &handle, &size, &flags)) @@ -785,28 +859,48 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args) self->type = TYPE_READ; self->handle = handle; - self->read_buffer = buf; - wsabuf.len = size; - wsabuf.buf = PyBytes_AS_STRING(buf); + self->allocated_buffer = buf; - Py_BEGIN_ALLOW_THREADS - ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags, - &self->overlapped, NULL); - Py_END_ALLOW_THREADS + return do_WSARecv(self, handle, PyBytes_AS_STRING(buf), size, flags); +} - self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); - switch (err) { - case ERROR_BROKEN_PIPE: - mark_as_completed(&self->overlapped); - return SetFromWindowsErr(err); - case ERROR_SUCCESS: - case ERROR_MORE_DATA: - case ERROR_IO_PENDING: - Py_RETURN_NONE; - default: - self->type = TYPE_NOT_STARTED; - return SetFromWindowsErr(err); +PyDoc_STRVAR( + Overlapped_WSARecvInto_doc, + "WSARecvInto(handle, buf, flags) -> Overlapped[bytes_transferred]\n\n" + "Start overlapped receive"); + +static PyObject * +Overlapped_WSARecvInto(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + PyObject *bufobj; + DWORD flags; + + if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD, + &handle, &bufobj, &flags)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) + return NULL; + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->user_buffer); + PyErr_SetString(PyExc_ValueError, "buffer too large"); + return NULL; } +#endif + + self->type = TYPE_READINTO; + self->handle = handle; + + return do_WSARecv(self, handle, self->user_buffer.buf, + (DWORD)self->user_buffer.len, flags); } PyDoc_STRVAR( @@ -831,12 +925,12 @@ Overlapped_WriteFile(OverlappedObject *self, PyObject *args) return NULL; } - if (!PyArg_Parse(bufobj, "y*", &self->write_buffer)) + if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) return NULL; #if SIZEOF_SIZE_T > SIZEOF_LONG - if (self->write_buffer.len > (Py_ssize_t)ULONG_MAX) { - PyBuffer_Release(&self->write_buffer); + if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->user_buffer); PyErr_SetString(PyExc_ValueError, "buffer too large"); return NULL; } @@ -846,8 +940,8 @@ Overlapped_WriteFile(OverlappedObject *self, PyObject *args) self->handle = handle; Py_BEGIN_ALLOW_THREADS - ret = WriteFile(handle, self->write_buffer.buf, - (DWORD)self->write_buffer.len, + ret = WriteFile(handle, self->user_buffer.buf, + (DWORD)self->user_buffer.len, &written, &self->overlapped); Py_END_ALLOW_THREADS @@ -887,12 +981,12 @@ Overlapped_WSASend(OverlappedObject *self, PyObject *args) return NULL; } - if (!PyArg_Parse(bufobj, "y*", &self->write_buffer)) + if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) return NULL; #if SIZEOF_SIZE_T > SIZEOF_LONG - if (self->write_buffer.len > (Py_ssize_t)ULONG_MAX) { - PyBuffer_Release(&self->write_buffer); + if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->user_buffer); PyErr_SetString(PyExc_ValueError, "buffer too large"); return NULL; } @@ -900,8 +994,8 @@ Overlapped_WSASend(OverlappedObject *self, PyObject *args) self->type = TYPE_WRITE; self->handle = handle; - wsabuf.len = (DWORD)self->write_buffer.len; - wsabuf.buf = self->write_buffer.buf; + wsabuf.len = (DWORD)self->user_buffer.len; + wsabuf.buf = self->user_buffer.buf; Py_BEGIN_ALLOW_THREADS ret = WSASend((SOCKET)handle, &wsabuf, 1, &written, flags, @@ -951,7 +1045,7 @@ Overlapped_AcceptEx(OverlappedObject *self, PyObject *args) self->type = TYPE_ACCEPT; self->handle = (HANDLE)ListenSocket; - self->read_buffer = buf; + self->allocated_buffer = buf; Py_BEGIN_ALLOW_THREADS ret = Py_AcceptEx(ListenSocket, AcceptSocket, PyBytes_AS_STRING(buf), @@ -1193,8 +1287,12 @@ static PyMethodDef Overlapped_methods[] = { METH_NOARGS, Overlapped_cancel_doc}, {"ReadFile", (PyCFunction) Overlapped_ReadFile, METH_VARARGS, Overlapped_ReadFile_doc}, + {"ReadFileInto", (PyCFunction) Overlapped_ReadFileInto, + METH_VARARGS, Overlapped_ReadFileInto_doc}, {"WSARecv", (PyCFunction) Overlapped_WSARecv, METH_VARARGS, Overlapped_WSARecv_doc}, + {"WSARecvInto", (PyCFunction) Overlapped_WSARecvInto, + METH_VARARGS, Overlapped_WSARecvInto_doc}, {"WriteFile", (PyCFunction) Overlapped_WriteFile, METH_VARARGS, Overlapped_WriteFile_doc}, {"WSASend", (PyCFunction) Overlapped_WSASend, -- 2.40.0