]> granicus.if.org Git - python/commitdiff
Issue #23293, asyncio: Rewrite IocpProactor.connect_pipe()
authorVictor Stinner <victor.stinner@gmail.com>
Thu, 22 Jan 2015 21:55:08 +0000 (22:55 +0100)
committerVictor Stinner <victor.stinner@gmail.com>
Thu, 22 Jan 2015 21:55:08 +0000 (22:55 +0100)
Add _overlapped.ConnectPipe() which tries to connect to the pipe for
asynchronous I/O (overlapped): call CreateFile() in a loop until it doesn't
fail with ERROR_PIPE_BUSY. Use an increasing delay between 1 ms and 100 ms.

Remove Overlapped.WaitNamedPipeAndConnect() which is no more used.

Lib/asyncio/windows_events.py
Modules/overlapped.c

index 315455aa7c4f9fbe981e4fafef6bc3616d669f97..7d0dbe9d01dff676a586908a111f2ccdaea93842 100644 (file)
@@ -29,6 +29,12 @@ INFINITE = 0xffffffff
 ERROR_CONNECTION_REFUSED = 1225
 ERROR_CONNECTION_ABORTED = 1236
 
+# Initial delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_INIT_DELAY = 0.001
+
+# Maximum delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_MAX_DELAY = 0.100
+
 
 class _OverlappedFuture(futures.Future):
     """Subclass of Future which represents an overlapped operation.
@@ -495,25 +501,28 @@ class IocpProactor:
         return self._register(ov, pipe, finish_accept_pipe,
                               register=False)
 
-    def connect_pipe(self, address):
-        ov = _overlapped.Overlapped(NULL)
-        ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
-
-        def finish_connect_pipe(err, handle, ov):
-            # err, handle were arguments passed to PostQueuedCompletionStatus()
-            # in a function run in a thread pool.
-            if err == _overlapped.ERROR_SEM_TIMEOUT:
-                # Connection did not succeed within time limit.
-                msg = _overlapped.FormatMessage(err)
-                raise ConnectionRefusedError(0, msg, None, err)
-            elif err != 0:
-                msg = _overlapped.FormatMessage(err)
-                raise OSError(0, msg, None, err)
+    def _connect_pipe(self, fut, address, delay):
+        # Unfortunately there is no way to do an overlapped connect to a pipe.
+        # Call CreateFile() in a loop until it doesn't fail with
+        # ERROR_PIPE_BUSY
+        try:
+            handle = _overlapped.ConnectPipe(address)
+        except OSError as exc:
+            if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
+                # Polling: retry later
+                delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+                self._loop.call_later(delay,
+                                      self._connect_pipe, fut, address, delay)
             else:
-                return windows_utils.PipeHandle(handle)
+                fut.set_exception(exc)
+        else:
+            pipe = windows_utils.PipeHandle(handle)
+            fut.set_result(pipe)
 
-        return self._register(ov, None, finish_connect_pipe,
-                              wait_for_post=True)
+    def connect_pipe(self, address):
+        fut = futures.Future(loop=self._loop)
+        self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
+        return fut
 
     def wait_for_handle(self, handle, timeout=None):
         """Wait for a handle.
index d22c626ecf84f142b704c59716c0a23475197b13..8fe2e247bc137a6cc04a565689181cf129c13ffa 100644 (file)
@@ -52,12 +52,6 @@ typedef struct {
     };
 } OverlappedObject;
 
-typedef struct {
-    OVERLAPPED *Overlapped;
-    HANDLE IocpHandle;
-    char Address[1];
-} WaitNamedPipeAndConnectContext;
-
 /*
  * Map Windows error codes to subclasses of OSError
  */
@@ -1133,99 +1127,33 @@ Overlapped_ConnectNamedPipe(OverlappedObject *self, PyObject *args)
     }
 }
 
-/* Unfortunately there is no way to do an overlapped connect to a
-   pipe.  We instead use WaitNamedPipe() and CreateFile() in a thread
-   pool thread.  If a connection succeeds within a time limit (10
-   seconds) then PostQueuedCompletionStatus() is used to return the
-   pipe handle to the completion port. */
-
-static DWORD WINAPI
-WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx)
-{
-    HANDLE PipeHandle = INVALID_HANDLE_VALUE;
-    DWORD Start = GetTickCount();
-    DWORD Deadline = Start + 10*1000;
-    DWORD Error = 0;
-    DWORD Timeout;
-    BOOL Success;
-
-    for ( ; ; ) {
-        Timeout = Deadline - GetTickCount();
-        if ((int)Timeout < 0)
-            break;
-        Success = WaitNamedPipe(ctx->Address, Timeout);
-        Error = Success ? ERROR_SUCCESS : GetLastError();
-        switch (Error) {
-            case ERROR_SUCCESS:
-                PipeHandle = CreateFile(ctx->Address,
-                                        GENERIC_READ | GENERIC_WRITE,
-                                        0, NULL, OPEN_EXISTING,
-                                        FILE_FLAG_OVERLAPPED, NULL);
-                if (PipeHandle == INVALID_HANDLE_VALUE)
-                    continue;
-                break;
-            case ERROR_SEM_TIMEOUT:
-                continue;
-        }
-        break;
-    }
-    if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error,
-                                    (ULONG_PTR)PipeHandle, ctx->Overlapped))
-        CloseHandle(PipeHandle);
-    free(ctx);
-    return 0;
-}
-
 PyDoc_STRVAR(
-    Overlapped_WaitNamedPipeAndConnect_doc,
-    "WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n"
-    "Start overlapped connection to address, notifying iocp_handle when\n"
-    "finished");
+    ConnectPipe_doc,
+    "ConnectPipe(addr) -> pipe_handle\n\n"
+    "Connect to the pipe for asynchronous I/O (overlapped).");
 
 static PyObject *
-Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args)
+ConnectPipe(OverlappedObject *self, PyObject *args)
 {
-    char *Address;
-    Py_ssize_t AddressLength;
-    HANDLE IocpHandle;
-    OVERLAPPED Overlapped;
-    BOOL ret;
-    DWORD err;
-    WaitNamedPipeAndConnectContext *ctx;
-    Py_ssize_t ContextLength;
+    PyObject *AddressObj;
+    wchar_t *Address;
+    HANDLE PipeHandle;
 
-    if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER,
-                          &Address, &AddressLength, &IocpHandle, &Overlapped))
+    if (!PyArg_ParseTuple(args, "U",  &AddressObj))
         return NULL;
 
-    if (self->type != TYPE_NONE) {
-        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+    Address = PyUnicode_AsWideCharString(AddressObj, NULL);
+    if (Address == NULL)
         return NULL;
-    }
 
-    ContextLength = (AddressLength +
-                     offsetof(WaitNamedPipeAndConnectContext, Address));
-    ctx = calloc(1, ContextLength + 1);
-    if (ctx == NULL)
-        return PyErr_NoMemory();
-    memcpy(ctx->Address, Address, AddressLength + 1);
-    ctx->Overlapped = &self->overlapped;
-    ctx->IocpHandle = IocpHandle;
-
-    self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT;
-    self->handle = NULL;
-
-    Py_BEGIN_ALLOW_THREADS
-    ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx,
-                            WT_EXECUTELONGFUNCTION);
-    Py_END_ALLOW_THREADS
-
-    mark_as_completed(&self->overlapped);
-
-    self->error = err = ret ? ERROR_SUCCESS : GetLastError();
-    if (!ret)
-        return SetFromWindowsErr(err);
-    Py_RETURN_NONE;
+    PipeHandle = CreateFileW(Address,
+                             GENERIC_READ | GENERIC_WRITE,
+                             0, NULL, OPEN_EXISTING,
+                             FILE_FLAG_OVERLAPPED, NULL);
+    PyMem_Free(Address);
+    if (PipeHandle == INVALID_HANDLE_VALUE)
+        return SetFromWindowsErr(0);
+    return Py_BuildValue(F_HANDLE, PipeHandle);
 }
 
 static PyObject*
@@ -1262,9 +1190,6 @@ static PyMethodDef Overlapped_methods[] = {
      METH_VARARGS, Overlapped_DisconnectEx_doc},
     {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe,
      METH_VARARGS, Overlapped_ConnectNamedPipe_doc},
-    {"WaitNamedPipeAndConnect",
-     (PyCFunction) Overlapped_WaitNamedPipeAndConnect,
-     METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc},
     {NULL}
 };
 
@@ -1350,6 +1275,9 @@ static PyMethodDef overlapped_functions[] = {
      METH_VARARGS, SetEvent_doc},
     {"ResetEvent", overlapped_ResetEvent,
      METH_VARARGS, ResetEvent_doc},
+    {"ConnectPipe",
+     (PyCFunction) ConnectPipe,
+     METH_VARARGS, ConnectPipe_doc},
     {NULL}
 };
 
@@ -1394,6 +1322,7 @@ PyInit__overlapped(void)
     WINAPI_CONSTANT(F_DWORD,  ERROR_IO_PENDING);
     WINAPI_CONSTANT(F_DWORD,  ERROR_NETNAME_DELETED);
     WINAPI_CONSTANT(F_DWORD,  ERROR_SEM_TIMEOUT);
+    WINAPI_CONSTANT(F_DWORD,  ERROR_PIPE_BUSY);
     WINAPI_CONSTANT(F_DWORD,  INFINITE);
     WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
     WINAPI_CONSTANT(F_HANDLE, NULL);