]> granicus.if.org Git - libevent/commitdiff
First tests for IOCP loop, and related fixes.
authorNick Mathewson <nickm@torproject.org>
Thu, 30 Apr 2009 19:04:44 +0000 (19:04 +0000)
committerNick Mathewson <nickm@torproject.org>
Thu, 30 Apr 2009 19:04:44 +0000 (19:04 +0000)
The fixes are: a shutdown mode that works, and a way to activate an
arbitrary event_overlapped.

svn:r1254

event_iocp.c
iocp-internal.h
test/regress_iocp.c [new file with mode: 0644]

index f3c1c5c9132f1e05ddb322ee146cbfc80a02a3ab..32d446292e2d84b0def95d102d1edcff3af7f10e 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <windows.h>
 #include <process.h>
+#include <stdio.h>
 
 #include "event2/util.h"
 #include "util-internal.h"
@@ -33,6 +34,8 @@
 #include "log-internal.h"
 #include "mm-internal.h"
 
+#define NOTIFICATION_KEY ((ULONG_PTR)-1)
+
 void
 event_overlapped_init(struct event_overlapped *o, iocp_callback cb)
 {
@@ -56,18 +59,30 @@ loop(void *_port)
        ULONG_PTR key;
        DWORD bytes;
        long ms = port->ms;
+       HANDLE p = port->port;
 
        if (ms <= 0)
                ms = INFINITE;
 
-
-       while (GetQueuedCompletionStatus(port->port, &bytes, &key,
+       while (GetQueuedCompletionStatus(p, &bytes, &key,
                &overlapped, ms)) {
-               if (port->shutdown)
+               EnterCriticalSection(&port->lock);
+               if (port->shutdown) {
+                       if (--port->n_live_threads == 0)
+                               ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
+                       LeaveCriticalSection(&port->lock);
                        return;
-               handle_entry(overlapped, key, bytes);
+               }
+               LeaveCriticalSection(&port->lock);
+
+               if (key != NOTIFICATION_KEY)
+                       handle_entry(overlapped, key, bytes);
        }
        event_warnx("GetQueuedCompletionStatus exited with no event.");
+       EnterCriticalSection(&port->lock);
+       if (--port->n_live_threads == 0)
+               ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
+       LeaveCriticalSection(&port->lock);
 }
 
 int
@@ -85,26 +100,97 @@ struct event_iocp_port *
 event_iocp_port_launch(void)
 {
        struct event_iocp_port *port;
-       int thread, i;
+       int i;
 
        if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
                return NULL;
        port->n_threads = 2;
-       port->port = CreateIoCompletionPort(NULL, NULL, 0, port->n_threads);
+       port->threads = calloc(port->n_threads, sizeof(HANDLE));
+       if (!port->threads)
+               goto err;
+
+       port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, port->n_threads);
        port->ms = -1;
        if (!port->port)
-               mm_free(port);
+               goto err;
+
+       port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
+       if (!port->shutdownSemaphore)
+               goto err;
+
+       for (i=0; i<port->n_threads; ++i) {
+               uintptr_t th = _beginthread(loop, 0, port);
+               if (th == (uintptr_t)-1)
+                       goto err;
+               port->threads[i] = (HANDLE)th;
+               ++port->n_live_threads;
+       }
 
-       for (i=0; i<port->n_threads; ++i)
-               thread = _beginthread(loop, 0, port);
+       InitializeCriticalSection(&port->lock);
 
        return port;
+err:
+       if (port->port)
+               CloseHandle(port->port);
+       if (port->threads)
+               mm_free(port->threads);
+       if (port->shutdownSemaphore)
+               CloseHandle(port->shutdownSemaphore);
+       mm_free(port);
+       return NULL;
 }
 
+static void
+_event_iocp_port_unlock_and_free(struct event_iocp_port *port)
+{
+       DeleteCriticalSection(&port->lock);
+       CloseHandle(port->port);
+       CloseHandle(port->shutdownSemaphore);
+       mm_free(port->threads);
+       mm_free(port);
+}
 
-void
-event_iocp_shutdown(struct event_iocp_port *port)
+static int
+event_iocp_notify_all(struct event_iocp_port *port)
+{
+       int i, r, ok=1;
+       for (i=0; i<port->n_threads; ++i) {
+               r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY,
+                   NULL);
+               if (!r)
+                       ok = 0;
+       }
+       return ok ? 0 : -1;
+}
+
+int
+event_iocp_shutdown(struct event_iocp_port *port, long waitMsec)
 {
+       int n;
+       EnterCriticalSection(&port->lock);
        port->shutdown = 1;
-       /* XXX notify. */
+       LeaveCriticalSection(&port->lock);
+       event_iocp_notify_all(port);
+
+       WaitForSingleObject(port->shutdownSemaphore, waitMsec);
+       EnterCriticalSection(&port->lock);
+       n = port->n_live_threads;
+       LeaveCriticalSection(&port->lock);
+       if (n == 0) {
+               _event_iocp_port_unlock_and_free(port);
+               return 0;
+       } else {
+               return -1;
+       }
+}
+
+int
+event_iocp_activate_overlapped(
+    struct event_iocp_port *port, struct event_overlapped *o,
+    uintptr_t key, ev_uint32_t n)
+{
+       BOOL r;
+
+       r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped);
+       return (r==0) ? -1 : 0;
 }
index c374f6124b189e2e105e737ebcdcff0a56a65760..de7311559422324b09eafb2005be418ed7e0a26e 100644 (file)
@@ -60,13 +60,26 @@ struct event_overlapped {
 struct event_iocp_port {
        /** The port itself */
        HANDLE port;
-       /** Number of threads open on the port. */
-       int n_threads;
+       /* A lock to cover internal structures. */
+       CRITICAL_SECTION lock;
+       /** Number of threads ever open on the port. */
+       short n_threads;
        /** True iff we're shutting down all the threads on this port */
-       int shutdown;
+       short shutdown;
        /** How often the threads on this port check for shutdown and other
         * conditions */
        long ms;
+       /* The threads that are waiting for events. */
+       HANDLE *threads;
+       /** Number of threads currently open on this port. */
+       short n_live_threads;
+       /* A semaphore to signal when we are done shutting down. */
+       HANDLE *shutdownSemaphore;
+};
+#else
+/* Dummy definition so we can test-compile more things on unix. */
+struct event_overlapped {
+       iocp_callback cb;
 };
 #endif
 
@@ -120,8 +133,18 @@ struct event_iocp_port *event_iocp_port_launch(void);
 int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd,
     uintptr_t key);
 
-/** Shut down all threads serving an iocp. */
-void event_iocp_shutdown(struct event_iocp_port *port);
+/** Tell all threads serving an iocp to stop.  Wait for up to waitMsec for all
+    the threads to finish whatever they're doing.  If all the threads are
+    done, free the port and return 0.  Otherwise, return -1.  If you get a -1
+    return value, it is safe to call this function again.
+*/
+int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec);
+
+/* FIXME document. */
+int event_iocp_activate_overlapped(struct event_iocp_port *port,
+    struct event_overlapped *o,
+    uintptr_t key, ev_uint32_t n_bytes);
+
 
 #ifdef __cplusplus
 }
diff --git a/test/regress_iocp.c b/test/regress_iocp.c
new file mode 100644 (file)
index 0000000..6649263
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2009 Niels Provos and Nick Mathewson
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <event2/event.h>
+#include <event2/thread.h>
+
+#include "tinytest.h"
+#include "tinytest_macros.h"
+
+#include "iocp-internal.h"
+#include "evthread-internal.h"
+
+#define MAX_CALLS 16
+struct dummy_overlapped {
+       struct event_overlapped eo;
+       void *lock;
+       int call_count;
+       uintptr_t keys[MAX_CALLS];
+       ssize_t sizes[MAX_CALLS];
+};
+
+static void
+dummy_cb(struct event_overlapped *o, uintptr_t key, ssize_t n)
+{
+       struct dummy_overlapped *d_o =
+           EVUTIL_UPCAST(o, struct dummy_overlapped, eo);
+
+       EVLOCK_LOCK(d_o->lock, EVTHREAD_WRITE);
+       if (d_o->call_count < MAX_CALLS) {
+               d_o->keys[d_o->call_count] = key;
+               d_o->sizes[d_o->call_count] = n;
+       }
+       d_o->call_count++;
+       EVLOCK_UNLOCK(d_o->lock, EVTHREAD_WRITE);
+}
+
+static int
+pair_is_in(struct dummy_overlapped *o, uintptr_t key, ssize_t n)
+{
+       int i;
+       int result = 0;
+       EVLOCK_LOCK(o->lock, EVTHREAD_WRITE);
+       for (i=0; i < o->call_count; ++i) {
+               if (o->keys[i] == key && o->sizes[i] == n) {
+                       result = 1;
+                       break;
+               }
+       }
+       EVLOCK_UNLOCK(o->lock, EVTHREAD_WRITE);
+       return result;
+}
+
+static void
+test_iocp_port(void *loop)
+{
+       struct event_iocp_port *port = NULL;
+       struct dummy_overlapped o1, o2;
+
+#ifdef WIN32
+       evthread_use_windows_threads();
+#endif
+       memset(&o1, 0, sizeof(o1));
+       memset(&o2, 0, sizeof(o2));
+
+       EVTHREAD_ALLOC_LOCK(o1.lock);
+       EVTHREAD_ALLOC_LOCK(o2.lock);
+
+       tt_assert(o1.lock);
+       tt_assert(o2.lock);
+
+       event_overlapped_init(&o1.eo, dummy_cb);
+       event_overlapped_init(&o2.eo, dummy_cb);
+
+       port = event_iocp_port_launch();
+       tt_assert(port);
+
+       tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 10, 105));
+       tt_assert(!event_iocp_activate_overlapped(port, &o2.eo, 25, 205));
+
+#ifdef WIN32
+       /* FIXME Be smarter. */
+       Sleep(1000);
+#endif
+
+       tt_want(!event_iocp_shutdown(port, 2000));
+
+       tt_int_op(o1.call_count, ==, 1);
+       tt_int_op(o2.call_count, ==, 1);
+       tt_want(pair_is_in(&o1, 10, 105));
+       tt_want(pair_is_in(&o2, 25, 205));
+
+end:
+       /* FIXME free the locks. */
+       ;
+}
+
+struct testcase_t iocp_testcases[] = {
+       { "iocp_port", test_iocp_port, TT_FORK, NULL, NULL },
+       END_OF_TESTCASES
+};