#include <windows.h>
#include <process.h>
+#include <stdio.h>
#include "event2/util.h"
#include "util-internal.h"
#include "log-internal.h"
#include "mm-internal.h"
+#define NOTIFICATION_KEY ((ULONG_PTR)-1)
+
void
event_overlapped_init(struct event_overlapped *o, iocp_callback cb)
{
ULONG_PTR key;
DWORD bytes;
long ms = port->ms;
+ HANDLE p = port->port;
if (ms <= 0)
ms = INFINITE;
-
- while (GetQueuedCompletionStatus(port->port, &bytes, &key,
+ while (GetQueuedCompletionStatus(p, &bytes, &key,
&overlapped, ms)) {
- if (port->shutdown)
+ EnterCriticalSection(&port->lock);
+ if (port->shutdown) {
+ if (--port->n_live_threads == 0)
+ ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
+ LeaveCriticalSection(&port->lock);
return;
- handle_entry(overlapped, key, bytes);
+ }
+ LeaveCriticalSection(&port->lock);
+
+ if (key != NOTIFICATION_KEY)
+ handle_entry(overlapped, key, bytes);
}
event_warnx("GetQueuedCompletionStatus exited with no event.");
+ EnterCriticalSection(&port->lock);
+ if (--port->n_live_threads == 0)
+ ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
+ LeaveCriticalSection(&port->lock);
}
int
event_iocp_port_launch(void)
{
struct event_iocp_port *port;
- int thread, i;
+ int i;
if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
return NULL;
port->n_threads = 2;
- port->port = CreateIoCompletionPort(NULL, NULL, 0, port->n_threads);
+ port->threads = calloc(port->n_threads, sizeof(HANDLE));
+ if (!port->threads)
+ goto err;
+
+ port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, port->n_threads);
port->ms = -1;
if (!port->port)
- mm_free(port);
+ goto err;
+
+ port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
+ if (!port->shutdownSemaphore)
+ goto err;
+
+ for (i=0; i<port->n_threads; ++i) {
+ uintptr_t th = _beginthread(loop, 0, port);
+ if (th == (uintptr_t)-1)
+ goto err;
+ port->threads[i] = (HANDLE)th;
+ ++port->n_live_threads;
+ }
- for (i=0; i<port->n_threads; ++i)
- thread = _beginthread(loop, 0, port);
+ InitializeCriticalSection(&port->lock);
return port;
+err:
+ if (port->port)
+ CloseHandle(port->port);
+ if (port->threads)
+ mm_free(port->threads);
+ if (port->shutdownSemaphore)
+ CloseHandle(port->shutdownSemaphore);
+ mm_free(port);
+ return NULL;
}
+static void
+_event_iocp_port_unlock_and_free(struct event_iocp_port *port)
+{
+ DeleteCriticalSection(&port->lock);
+ CloseHandle(port->port);
+ CloseHandle(port->shutdownSemaphore);
+ mm_free(port->threads);
+ mm_free(port);
+}
-void
-event_iocp_shutdown(struct event_iocp_port *port)
+static int
+event_iocp_notify_all(struct event_iocp_port *port)
+{
+ int i, r, ok=1;
+ for (i=0; i<port->n_threads; ++i) {
+ r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY,
+ NULL);
+ if (!r)
+ ok = 0;
+ }
+ return ok ? 0 : -1;
+}
+
+int
+event_iocp_shutdown(struct event_iocp_port *port, long waitMsec)
{
+ int n;
+ EnterCriticalSection(&port->lock);
port->shutdown = 1;
- /* XXX notify. */
+ LeaveCriticalSection(&port->lock);
+ event_iocp_notify_all(port);
+
+ WaitForSingleObject(port->shutdownSemaphore, waitMsec);
+ EnterCriticalSection(&port->lock);
+ n = port->n_live_threads;
+ LeaveCriticalSection(&port->lock);
+ if (n == 0) {
+ _event_iocp_port_unlock_and_free(port);
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+int
+event_iocp_activate_overlapped(
+ struct event_iocp_port *port, struct event_overlapped *o,
+ uintptr_t key, ev_uint32_t n)
+{
+ BOOL r;
+
+ r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped);
+ return (r==0) ? -1 : 0;
}
struct event_iocp_port {
/** The port itself */
HANDLE port;
- /** Number of threads open on the port. */
- int n_threads;
+ /* A lock to cover internal structures. */
+ CRITICAL_SECTION lock;
+ /** Number of threads ever open on the port. */
+ short n_threads;
/** True iff we're shutting down all the threads on this port */
- int shutdown;
+ short shutdown;
/** How often the threads on this port check for shutdown and other
* conditions */
long ms;
+ /* The threads that are waiting for events. */
+ HANDLE *threads;
+ /** Number of threads currently open on this port. */
+ short n_live_threads;
+ /* A semaphore to signal when we are done shutting down. */
+ HANDLE *shutdownSemaphore;
+};
+#else
+/* Dummy definition so we can test-compile more things on unix. */
+struct event_overlapped {
+ iocp_callback cb;
};
#endif
int event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd,
uintptr_t key);
-/** Shut down all threads serving an iocp. */
-void event_iocp_shutdown(struct event_iocp_port *port);
+/** Tell all threads serving an iocp to stop. Wait for up to waitMsec for all
+ the threads to finish whatever they're doing. If all the threads are
+ done, free the port and return 0. Otherwise, return -1. If you get a -1
+ return value, it is safe to call this function again.
+*/
+int event_iocp_shutdown(struct event_iocp_port *port, long waitMsec);
+
+/* FIXME document. */
+int event_iocp_activate_overlapped(struct event_iocp_port *port,
+ struct event_overlapped *o,
+ uintptr_t key, ev_uint32_t n_bytes);
+
#ifdef __cplusplus
}
--- /dev/null
+/*
+ * Copyright (c) 2009 Niels Provos and Nick Mathewson
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <event2/event.h>
+#include <event2/thread.h>
+
+#include "tinytest.h"
+#include "tinytest_macros.h"
+
+#include "iocp-internal.h"
+#include "evthread-internal.h"
+
+#define MAX_CALLS 16
+struct dummy_overlapped {
+ struct event_overlapped eo;
+ void *lock;
+ int call_count;
+ uintptr_t keys[MAX_CALLS];
+ ssize_t sizes[MAX_CALLS];
+};
+
+static void
+dummy_cb(struct event_overlapped *o, uintptr_t key, ssize_t n)
+{
+ struct dummy_overlapped *d_o =
+ EVUTIL_UPCAST(o, struct dummy_overlapped, eo);
+
+ EVLOCK_LOCK(d_o->lock, EVTHREAD_WRITE);
+ if (d_o->call_count < MAX_CALLS) {
+ d_o->keys[d_o->call_count] = key;
+ d_o->sizes[d_o->call_count] = n;
+ }
+ d_o->call_count++;
+ EVLOCK_UNLOCK(d_o->lock, EVTHREAD_WRITE);
+}
+
+static int
+pair_is_in(struct dummy_overlapped *o, uintptr_t key, ssize_t n)
+{
+ int i;
+ int result = 0;
+ EVLOCK_LOCK(o->lock, EVTHREAD_WRITE);
+ for (i=0; i < o->call_count; ++i) {
+ if (o->keys[i] == key && o->sizes[i] == n) {
+ result = 1;
+ break;
+ }
+ }
+ EVLOCK_UNLOCK(o->lock, EVTHREAD_WRITE);
+ return result;
+}
+
+static void
+test_iocp_port(void *loop)
+{
+ struct event_iocp_port *port = NULL;
+ struct dummy_overlapped o1, o2;
+
+#ifdef WIN32
+ evthread_use_windows_threads();
+#endif
+ memset(&o1, 0, sizeof(o1));
+ memset(&o2, 0, sizeof(o2));
+
+ EVTHREAD_ALLOC_LOCK(o1.lock);
+ EVTHREAD_ALLOC_LOCK(o2.lock);
+
+ tt_assert(o1.lock);
+ tt_assert(o2.lock);
+
+ event_overlapped_init(&o1.eo, dummy_cb);
+ event_overlapped_init(&o2.eo, dummy_cb);
+
+ port = event_iocp_port_launch();
+ tt_assert(port);
+
+ tt_assert(!event_iocp_activate_overlapped(port, &o1.eo, 10, 105));
+ tt_assert(!event_iocp_activate_overlapped(port, &o2.eo, 25, 205));
+
+#ifdef WIN32
+ /* FIXME Be smarter. */
+ Sleep(1000);
+#endif
+
+ tt_want(!event_iocp_shutdown(port, 2000));
+
+ tt_int_op(o1.call_count, ==, 1);
+ tt_int_op(o2.call_count, ==, 1);
+ tt_want(pair_is_in(&o1, 10, 105));
+ tt_want(pair_is_in(&o2, 25, 205));
+
+end:
+ /* FIXME free the locks. */
+ ;
+}
+
+struct testcase_t iocp_testcases[] = {
+ { "iocp_port", test_iocp_port, TT_FORK, NULL, NULL },
+ END_OF_TESTCASES
+};