From: Nick Mathewson Date: Thu, 30 Apr 2009 19:04:44 +0000 (+0000) Subject: First tests for IOCP loop, and related fixes. X-Git-Tag: release-2.0.3-alpha~259 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=f1090833b2557bc18353aee293a61ee556da80df;p=libevent First tests for IOCP loop, and related fixes. The fixes are: a shutdown mode that works, and a way to activate an arbitrary event_overlapped. svn:r1254 --- diff --git a/event_iocp.c b/event_iocp.c index f3c1c5c9..32d44629 100644 --- a/event_iocp.c +++ b/event_iocp.c @@ -26,6 +26,7 @@ #include #include +#include #include "event2/util.h" #include "util-internal.h" @@ -33,6 +34,8 @@ #include "log-internal.h" #include "mm-internal.h" +#define NOTIFICATION_KEY ((ULONG_PTR)-1) + void event_overlapped_init(struct event_overlapped *o, iocp_callback cb) { @@ -56,18 +59,30 @@ loop(void *_port) ULONG_PTR key; DWORD bytes; long ms = port->ms; + HANDLE p = port->port; if (ms <= 0) ms = INFINITE; - - while (GetQueuedCompletionStatus(port->port, &bytes, &key, + while (GetQueuedCompletionStatus(p, &bytes, &key, &overlapped, ms)) { - if (port->shutdown) + EnterCriticalSection(&port->lock); + if (port->shutdown) { + if (--port->n_live_threads == 0) + ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); + LeaveCriticalSection(&port->lock); return; - handle_entry(overlapped, key, bytes); + } + LeaveCriticalSection(&port->lock); + + if (key != NOTIFICATION_KEY) + handle_entry(overlapped, key, bytes); } event_warnx("GetQueuedCompletionStatus exited with no event."); + EnterCriticalSection(&port->lock); + if (--port->n_live_threads == 0) + ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); + LeaveCriticalSection(&port->lock); } int @@ -85,26 +100,97 @@ struct event_iocp_port * event_iocp_port_launch(void) { struct event_iocp_port *port; - int thread, i; + int i; if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) return NULL; port->n_threads = 2; - port->port = CreateIoCompletionPort(NULL, NULL, 0, port->n_threads); + port->threads = calloc(port->n_threads, sizeof(HANDLE)); + if (!port->threads) + goto err; + + port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, port->n_threads); port->ms = -1; if (!port->port) - mm_free(port); + goto err; + + port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL); + if (!port->shutdownSemaphore) + goto err; + + for (i=0; in_threads; ++i) { + uintptr_t th = _beginthread(loop, 0, port); + if (th == (uintptr_t)-1) + goto err; + port->threads[i] = (HANDLE)th; + ++port->n_live_threads; + } - for (i=0; in_threads; ++i) - thread = _beginthread(loop, 0, port); + InitializeCriticalSection(&port->lock); return port; +err: + if (port->port) + CloseHandle(port->port); + if (port->threads) + mm_free(port->threads); + if (port->shutdownSemaphore) + CloseHandle(port->shutdownSemaphore); + mm_free(port); + return NULL; } +static void +_event_iocp_port_unlock_and_free(struct event_iocp_port *port) +{ + DeleteCriticalSection(&port->lock); + CloseHandle(port->port); + CloseHandle(port->shutdownSemaphore); + mm_free(port->threads); + mm_free(port); +} -void -event_iocp_shutdown(struct event_iocp_port *port) +static int +event_iocp_notify_all(struct event_iocp_port *port) +{ + int i, r, ok=1; + for (i=0; in_threads; ++i) { + r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY, + NULL); + if (!r) + ok = 0; + } + return ok ? 0 : -1; +} + +int +event_iocp_shutdown(struct event_iocp_port *port, long waitMsec) { + int n; + EnterCriticalSection(&port->lock); port->shutdown = 1; - /* XXX notify. */ + LeaveCriticalSection(&port->lock); + event_iocp_notify_all(port); + + WaitForSingleObject(port->shutdownSemaphore, waitMsec); + EnterCriticalSection(&port->lock); + n = port->n_live_threads; + LeaveCriticalSection(&port->lock); + if (n == 0) { + _event_iocp_port_unlock_and_free(port); + return 0; + } else { + return -1; + } +} + +int +event_iocp_activate_overlapped( + struct event_iocp_port *port, struct event_overlapped *o, + uintptr_t key, ev_uint32_t n) +{ + BOOL r; + + r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped); + return (r==0) ? -1 : 0; } diff --git a/iocp-internal.h b/iocp-internal.h index c374f612..de731155 100644 --- a/iocp-internal.h +++ b/iocp-internal.h @@ -60,13 +60,26 @@ struct event_overlapped { struct event_iocp_port { /** The port itself */ HANDLE port; - /** Number of threads open on the port. */ - int n_threads; + /* A lock to cover internal structures. */ + CRITICAL_SECTION lock; + /** Number of threads ever open on the port. */ + short n_threads; /** True iff we're shutting down all the threads on this port */ - int shutdown; + short shutdown; /** How often the threads on this port check for shutdown and other * conditions */ long ms; + /* The threads that are waiting for events. */ + HANDLE *threads; + /** Number of threads currently open on this port. */ + short n_live_threads; + /* A semaphore to signal when we are done shutting down. */ + HANDLE *shutdownSemaphore; +}; +#else +/* Dummy definition so we can test-compile more things on unix. */ +struct event_overlapped { + iocp_callback cb; }; #endif @@ -120,8 +133,18 @@ struct event_iocp_port *event_iocp_port_launch(void); int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd, uintptr_t key); -/** Shut down all threads serving an iocp. */ -void event_iocp_shutdown(struct event_iocp_port *port); +/** Tell all threads serving an iocp to stop. Wait for up to waitMsec for all + the threads to finish whatever they're doing. If all the threads are + done, free the port and return 0. Otherwise, return -1. If you get a -1 + return value, it is safe to call this function again. +*/ +int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec); + +/* FIXME document. */ +int event_iocp_activate_overlapped(struct event_iocp_port *port, + struct event_overlapped *o, + uintptr_t key, ev_uint32_t n_bytes); + #ifdef __cplusplus } diff --git a/test/regress_iocp.c b/test/regress_iocp.c new file mode 100644 index 00000000..6649263e --- /dev/null +++ b/test/regress_iocp.c @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2009 Niels Provos and Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include + +#include "tinytest.h" +#include "tinytest_macros.h" + +#include "iocp-internal.h" +#include "evthread-internal.h" + +#define MAX_CALLS 16 +struct dummy_overlapped { + struct event_overlapped eo; + void *lock; + int call_count; + uintptr_t keys[MAX_CALLS]; + ssize_t sizes[MAX_CALLS]; +}; + +static void +dummy_cb(struct event_overlapped *o, uintptr_t key, ssize_t n) +{ + struct dummy_overlapped *d_o = + EVUTIL_UPCAST(o, struct dummy_overlapped, eo); + + EVLOCK_LOCK(d_o->lock, EVTHREAD_WRITE); + if (d_o->call_count < MAX_CALLS) { + d_o->keys[d_o->call_count] = key; + d_o->sizes[d_o->call_count] = n; + } + d_o->call_count++; + EVLOCK_UNLOCK(d_o->lock, EVTHREAD_WRITE); +} + +static int +pair_is_in(struct dummy_overlapped *o, uintptr_t key, ssize_t n) +{ + int i; + int result = 0; + EVLOCK_LOCK(o->lock, EVTHREAD_WRITE); + for (i=0; i < o->call_count; ++i) { + if (o->keys[i] == key && o->sizes[i] == n) { + result = 1; + break; + } + } + EVLOCK_UNLOCK(o->lock, EVTHREAD_WRITE); + return result; +} + +static void +test_iocp_port(void *loop) +{ + struct event_iocp_port *port = NULL; + struct dummy_overlapped o1, o2; + +#ifdef WIN32 + evthread_use_windows_threads(); +#endif + memset(&o1, 0, sizeof(o1)); + memset(&o2, 0, sizeof(o2)); + + EVTHREAD_ALLOC_LOCK(o1.lock); + EVTHREAD_ALLOC_LOCK(o2.lock); + + tt_assert(o1.lock); + tt_assert(o2.lock); + + event_overlapped_init(&o1.eo, dummy_cb); + event_overlapped_init(&o2.eo, dummy_cb); + + port = event_iocp_port_launch(); + tt_assert(port); + + tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 10, 105)); + tt_assert(!event_iocp_activate_overlapped(port, &o2.eo, 25, 205)); + +#ifdef WIN32 + /* FIXME Be smarter. */ + Sleep(1000); +#endif + + tt_want(!event_iocp_shutdown(port, 2000)); + + tt_int_op(o1.call_count, ==, 1); + tt_int_op(o2.call_count, ==, 1); + tt_want(pair_is_in(&o1, 10, 105)); + tt_want(pair_is_in(&o2, 25, 205)); + +end: + /* FIXME free the locks. */ + ; +} + +struct testcase_t iocp_testcases[] = { + { "iocp_port", test_iocp_port, TT_FORK, NULL, NULL }, + END_OF_TESTCASES +};