Add a LEV_OPT_THREADSAFE option for threadsafe evconnlisteners
authorNick Mathewson <nickm@torproject.org>
Thu, 23 Sep 2010 20:49:58 +0000 (16:49 -0400)
committerNick Mathewson <nickm@torproject.org>
Thu, 7 Oct 2010 22:11:19 +0000 (18:11 -0400)
include/event2/listener.h
listener.c
test/regress_listener.c

index 372043603fb22eea4a837c469f1da7a7a00774a4..dededc5060f8ac46d61a9875d9c26d25ba55f43d 100644 (file)
@@ -66,6 +66,9 @@ typedef void (*evconnlistener_errorcb)(struct evconnlistener *, void *);
 /** Flag: Indicates that we should disable the timeout (if any) between when
  * this socket is closed and when we can listen again on the same port. */
 #define LEV_OPT_REUSEABLE              (1u<<3)
+/** Flag: Indicates that the listener should be locked so it's safe to use
+ * from multiple threadcs at once. */
+#define LEV_OPT_THREADSAFE             (1u<<4)
 
 /**
    Allocate a new evconnlistener object to listen for incoming TCP connections
index a1b770514afa96e7ee87698a5d63a8c446cab181..840b09fdc528483d566149f47b7e7d77c30ad0bc 100644 (file)
@@ -51,6 +51,7 @@
 #include "mm-internal.h"
 #include "util-internal.h"
 #include "log-internal.h"
+#include "evthread-internal.h"
 #ifdef WIN32
 #include "iocp-internal.h"
 #include "defer-internal.h"
@@ -60,16 +61,19 @@ struct evconnlistener_ops {
        int (*enable)(struct evconnlistener *);
        int (*disable)(struct evconnlistener *);
        void (*destroy)(struct evconnlistener *);
+       void (*shutdown)(struct evconnlistener *);
        evutil_socket_t (*getfd)(struct evconnlistener *);
        struct event_base *(*getbase)(struct evconnlistener *);
 };
 
 struct evconnlistener {
        const struct evconnlistener_ops *ops;
+       void *lock;
        evconnlistener_cb cb;
        evconnlistener_errorcb errorcb;
        void *user_data;
        unsigned flags;
+       int refcnt;
 };
 
 struct evconnlistener_event {
@@ -83,12 +87,15 @@ struct evconnlistener_iocp {
        evutil_socket_t fd;
        struct event_base *event_base;
        struct event_iocp_port *port;
-       CRITICAL_SECTION lock;
-       int n_accepting;
+       short n_accepting;
+       short shutting_down;
        struct accepting_socket **accepting;
 };
 #endif
 
+#define LOCK(listener) EVLOCK_LOCK((listener)->lock, 0)
+#define UNLOCK(listener) EVLOCK_UNLOCK((listener)->lock, 0)
+
 struct evconnlistener *
 evconnlistener_new_async(struct event_base *base,
     evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
@@ -100,10 +107,36 @@ static void event_listener_destroy(struct evconnlistener *);
 static evutil_socket_t event_listener_getfd(struct evconnlistener *);
 static struct event_base *event_listener_getbase(struct evconnlistener *);
 
+#if 0
+static void
+listener_incref_and_lock(struct evconnlistener *listener)
+{
+       LOCK(listener);
+       ++listener->refcnt;
+}
+#endif
+
+static int
+listener_decref_and_unlock(struct evconnlistener *listener)
+{
+       int refcnt = --listener->refcnt;
+       if (refcnt == 0) {
+               listener->ops->destroy(listener);
+               UNLOCK(listener);
+               EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+               mm_free(listener);
+               return 1;
+       } else {
+               UNLOCK(listener);
+               return 0;
+       }
+}
+
 static const struct evconnlistener_ops evconnlistener_event_ops = {
        event_listener_enable,
        event_listener_disable,
        event_listener_destroy,
+       NULL, /* shutdown */
        event_listener_getfd,
        event_listener_getbase
 };
@@ -143,6 +176,11 @@ evconnlistener_new(struct event_base *base,
        lev->base.cb = cb;
        lev->base.user_data = ptr;
        lev->base.flags = flags;
+       lev->base.refcnt = 1;
+
+       if (flags & LEV_OPT_THREADSAFE) {
+               EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+       }
 
        event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
            listener_read_cb, lev);
@@ -204,8 +242,12 @@ evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
 void
 evconnlistener_free(struct evconnlistener *lev)
 {
-       lev->ops->destroy(lev);
-       mm_free(lev);
+       LOCK(lev);
+       lev->cb = NULL;
+       lev->errorcb = NULL;
+       if (lev->ops->shutdown)
+               lev->ops->shutdown(lev);
+       listener_decref_and_unlock(lev);
 }
 
 static void
@@ -223,13 +265,21 @@ event_listener_destroy(struct evconnlistener *lev)
 int
 evconnlistener_enable(struct evconnlistener *lev)
 {
-       return lev->ops->enable(lev);
+       int r;
+       LOCK(lev);
+       r = lev->ops->enable(lev);
+       UNLOCK(lev);
+       return r;
 }
 
 int
 evconnlistener_disable(struct evconnlistener *lev)
 {
-       return lev->ops->disable(lev);
+       int r;
+       LOCK(lev);
+       r = lev->ops->disable(lev);
+       UNLOCK(lev);
+       return r;
 }
 
 static int
@@ -251,7 +301,11 @@ event_listener_disable(struct evconnlistener *lev)
 evutil_socket_t
 evconnlistener_get_fd(struct evconnlistener *lev)
 {
-       return lev->ops->getfd(lev);
+       evutil_socket_t fd;
+       LOCK(lev);
+       fd = lev->ops->getfd(lev);
+       UNLOCK(lev);
+       return fd;
 }
 
 static evutil_socket_t
@@ -265,7 +319,11 @@ event_listener_getfd(struct evconnlistener *lev)
 struct event_base *
 evconnlistener_get_base(struct evconnlistener *lev)
 {
-       return lev->ops->getbase(lev);
+       struct event_base *base;
+       LOCK(lev);
+       base = lev->ops->getbase(lev);
+       UNLOCK(lev);
+       return base;
 }
 
 static struct event_base *
@@ -279,7 +337,9 @@ event_listener_getbase(struct evconnlistener *lev)
 void evconnlistener_set_error_cb(struct evconnlistener *lev,
     evconnlistener_errorcb errorcb)
 {
+       LOCK(lev);
        lev->errorcb = errorcb;
+       UNLOCK(lev);
 }
 
 static void
@@ -287,6 +347,10 @@ listener_read_cb(evutil_socket_t fd, short what, void *p)
 {
        struct evconnlistener *lev = p;
        int err;
+       evconnlistener_cb cb;
+       evconnlistener_errorcb errorcb;
+       void *user_data;
+       LOCK(lev);
        while (1) {
                struct sockaddr_storage ss;
 #ifdef WIN32
@@ -301,16 +365,40 @@ listener_read_cb(evutil_socket_t fd, short what, void *p)
                if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
                        evutil_make_socket_nonblocking(new_fd);
 
-               lev->cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
-                   lev->user_data);
+               if (lev->cb == NULL) {
+                       UNLOCK(lev);
+                       return;
+               }
+               ++lev->refcnt;
+               cb = lev->cb;
+               user_data = lev->user_data;
+               UNLOCK(lev);
+               cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
+                   user_data);
+               LOCK(lev);
+               if (lev->refcnt == 1) {
+                       int freed = listener_decref_and_unlock(lev);
+                       EVUTIL_ASSERT(freed);
+                       return;
+               }
+               --lev->refcnt;
        }
        err = evutil_socket_geterror(fd);
-       if (EVUTIL_ERR_ACCEPT_RETRIABLE(err))
+       if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) {
+               UNLOCK(lev);
                return;
-       if (lev->errorcb != NULL)
-               lev->errorcb(lev, lev->user_data);
-       else
+       }
+       if (lev->errorcb != NULL) {
+               ++lev->refcnt;
+               errorcb = lev->errorcb;
+               user_data = lev->user_data;
+               UNLOCK(lev);
+               errorcb(lev, user_data);
+               LOCK(lev);
+               listener_decref_and_unlock(lev);
+       } else {
                event_sock_warn(fd, "Error from accept() call");
+       }
 }
 
 #ifdef WIN32
@@ -318,6 +406,7 @@ struct accepting_socket {
        CRITICAL_SECTION lock;
        struct event_overlapped overlapped;
        SOCKET s;
+       int error;
        struct deferred_cb deferred;
        struct evconnlistener_iocp *lev;
        ev_uint8_t buflen;
@@ -382,8 +471,11 @@ start_accepting(struct accepting_socket *as)
        const struct win32_extension_fns *ext = event_get_win32_extension_fns();
        DWORD pending = 0;
        SOCKET s = socket(as->family, SOCK_STREAM, 0);
-       if (s == INVALID_SOCKET)
-               return -1;
+       int error = 0;
+       if (s == INVALID_SOCKET) {
+               error = WSAGetLastError();
+               goto report_err;
+       }
 
        setsockopt(s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
            (char *)&as->lev->fd, sizeof(&as->lev->fd));
@@ -404,14 +496,20 @@ start_accepting(struct accepting_socket *as)
                /* Immediate success! */
                accepted_socket_cb(&as->overlapped, 1, 0, 1);
        } else {
-               int err = WSAGetLastError();
-               if (err != ERROR_IO_PENDING) {
-                       event_warnx("AcceptEx: %s", evutil_socket_error_to_string(err));
-                       return -1;
+               error = WSAGetLastError();
+               if (error != ERROR_IO_PENDING) {
+                       goto report_err;
                }
        }
 
        return 0;
+
+report_err:
+       as->error = error;
+       event_deferred_cb_schedule(
+               event_base_get_deferred_cb_queue(as->lev->event_base),
+               &as->deferred);
+       return 0;
 }
 
 static void
@@ -424,32 +522,63 @@ stop_accepting(struct accepting_socket *as)
 }
 
 static void
-accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
+accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg)
 {
        struct accepting_socket *as = arg;
 
        struct sockaddr *sa_local=NULL, *sa_remote=NULL;
        int socklen_local=0, socklen_remote=0;
        const struct win32_extension_fns *ext = event_get_win32_extension_fns();
+       struct evconnlistener *lev = &as->lev->base;
+       evutil_socket_t sock=-1;
+       void *data;
+       evconnlistener_cb cb=NULL;
+       evconnlistener_errorcb errorcb=NULL;
+       int error;
 
        EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
 
+       LOCK(lev);
        EnterCriticalSection(&as->lock);
        if (as->free_on_cb) {
                free_and_unlock_accepting_socket(as);
+               listener_decref_and_unlock(lev);
                return;
        }
 
-       ext->GetAcceptExSockaddrs(
-               as->addrbuf, 0, as->buflen/2, as->buflen/2,
-               &sa_local, &socklen_local, &sa_remote, &socklen_remote);
+       ++lev->refcnt;
 
-       as->lev->base.cb(&as->lev->base, as->s, sa_remote,
-           socklen_remote, as->lev->base.user_data);
+       error = as->error;
+       if (error) {
+               as->error = 0;
+               errorcb = lev->errorcb;
+       } else {
+               ext->GetAcceptExSockaddrs(
+                       as->addrbuf, 0, as->buflen/2, as->buflen/2,
+                       &sa_local, &socklen_local, &sa_remote,
+                       &socklen_remote);
+               sock = as->s;
+               cb = lev->cb;
+               as->s = INVALID_SOCKET;
+       }
+       data = lev->user_data;
 
-       as->s = INVALID_SOCKET;
+       LeaveCriticalSection(&as->lock);
+       UNLOCK(lev);
+
+       if (errorcb) {
+               WSASetLastError(error);
+               errorcb(lev, data);
+       } else {
+               cb(lev, sock, sa_remote, socklen_remote, data);
+       }
+
+       LOCK(lev);
+       if (listener_decref_and_unlock(lev))
+               return;
 
-       start_accepting(as); /* XXXX handle error */
+       EnterCriticalSection(&as->lock);
+       start_accepting(as);
        LeaveCriticalSection(&as->lock);
 }
 
@@ -459,6 +588,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
        struct accepting_socket *as =
            EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
 
+       LOCK(&as->lev->base);
        EnterCriticalSection(&as->lock);
        if (ok) {
                /* XXXX Don't do this if some EV_MT flag is set. */
@@ -467,16 +597,32 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
                        &as->deferred);
                LeaveCriticalSection(&as->lock);
        } else if (as->free_on_cb) {
+               struct evconnlistener *lev = &as->lev->base;
                free_and_unlock_accepting_socket(as);
+               listener_decref_and_unlock(lev);
+               return;
        } else if (as->s == INVALID_SOCKET) {
                /* This is okay; we were disabled by iocp_listener_disable. */
                LeaveCriticalSection(&as->lock);
        } else {
                /* Some error on accept that we couldn't actually handle. */
+               BOOL ok;
+               DWORD transfer = 0, flags=0;
                event_sock_warn(as->s, "Unexpected error on AcceptEx");
+               ok = WSAGetOverlappedResult(as->s, &o->overlapped,
+                   &transfer, FALSE, &flags);
+               if (ok) {
+                       /* well, that was confusing! */
+                       as->error = 1;
+               } else {
+                       as->error = WSAGetLastError();
+               }
+               event_deferred_cb_schedule(
+                       event_base_get_deferred_cb_queue(as->lev->event_base),
+                       &as->deferred);
                LeaveCriticalSection(&as->lock);
-               /* XXXX send error to user */
        }
+       UNLOCK(&as->lev->base);
 }
 
 static int
@@ -486,17 +632,17 @@ iocp_listener_enable(struct evconnlistener *lev)
        struct evconnlistener_iocp *lev_iocp =
            EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
 
-       EnterCriticalSection(&lev_iocp->lock);
+       LOCK(lev);
        for (i = 0; i < lev_iocp->n_accepting; ++i) {
                struct accepting_socket *as = lev_iocp->accepting[i];
                if (!as)
                        continue;
                EnterCriticalSection(&as->lock);
                if (!as->free_on_cb && as->s == INVALID_SOCKET)
-                       start_accepting(as); /* XXXX handle error */
+                       start_accepting(as);
                LeaveCriticalSection(&as->lock);
        }
-       LeaveCriticalSection(&lev_iocp->lock);
+       UNLOCK(lev);
        return 0;
 }
 
@@ -507,7 +653,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
        struct evconnlistener_iocp *lev_iocp =
            EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
 
-       EnterCriticalSection(&lev_iocp->lock);
+       LOCK(lev);
        for (i = 0; i < lev_iocp->n_accepting; ++i) {
                struct accepting_socket *as = lev_iocp->accepting[i];
                if (!as)
@@ -520,7 +666,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
                }
                LeaveCriticalSection(&as->lock);
        }
-       LeaveCriticalSection(&lev_iocp->lock);
+       UNLOCK(lev);
        return 0;
 }
 
@@ -529,10 +675,18 @@ iocp_listener_disable(struct evconnlistener *lev)
 {
        return iocp_listener_disable_impl(lev,0);
 }
+
 static void
 iocp_listener_destroy(struct evconnlistener *lev)
 {
-       iocp_listener_disable_impl(lev,1);
+       struct evconnlistener_iocp *lev_iocp =
+           EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
+
+       if (! lev_iocp->shutting_down) {
+               lev_iocp->shutting_down = 1;
+               iocp_listener_disable_impl(lev,1);
+       }
+
 }
 
 static evutil_socket_t
@@ -554,6 +708,7 @@ static const struct evconnlistener_ops evconnlistener_iocp_ops = {
        iocp_listener_enable,
        iocp_listener_disable,
        iocp_listener_destroy,
+       iocp_listener_destroy, /* shutdown */
        iocp_listener_getfd,
        iocp_listener_getbase
 };
@@ -571,6 +726,8 @@ evconnlistener_new_async(struct event_base *base,
        struct evconnlistener_iocp *lev;
        int i;
 
+       flags |= LEV_OPT_THREADSAFE;
+
        if (!base || !event_base_get_iocp(base))
                goto err;
 
@@ -595,6 +752,7 @@ evconnlistener_new_async(struct event_base *base,
        lev->base.cb = cb;
        lev->base.user_data = ptr;
        lev->base.flags = flags;
+       lev->base.refcnt = 1;
 
        lev->port = event_base_get_iocp(base);
        lev->fd = fd;
@@ -603,7 +761,7 @@ evconnlistener_new_async(struct event_base *base,
        if (event_iocp_port_associate(lev->port, fd, 1) < 0)
                goto err_free_lev;
 
-       InitializeCriticalSectionAndSpinCount(&lev->lock, 1000);
+       EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 
        lev->n_accepting = N_SOCKETS_PER_LISTENER;
        lev->accepting = mm_calloc(lev->n_accepting,
@@ -624,6 +782,7 @@ evconnlistener_new_async(struct event_base *base,
                        free_and_unlock_accepting_socket(lev->accepting[i]);
                        goto err_free_accepting;
                }
+               ++lev->base.refcnt;
        }
 
        return &lev->base;
@@ -632,7 +791,7 @@ err_free_accepting:
        mm_free(lev->accepting);
        /* XXXX free the other elements. */
 err_delete_lock:
-       DeleteCriticalSection(&lev->lock);
+       EVTHREAD_FREE_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 err_free_lev:
        mm_free(lev);
 err:
index 35cb8fcfbdc6bfe334b6e84a144ee86269628a83..ce7e064fc3c12e571beccea05f6d9c349f3e201b 100644 (file)
@@ -77,6 +77,10 @@ regress_pick_a_port(void *arg)
 
        evutil_socket_t fd1 = -1, fd2 = -1, fd3 = -1;
 
+       if (data->setup_data && strstr((char*)data->setup_data, "ts")) {
+               flags |= LEV_OPT_THREADSAFE;
+       }
+
        memset(&sin, 0, sizeof(sin));
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
@@ -148,13 +152,18 @@ regress_listener_error(void *arg)
        struct event_base *base = data->base;
        struct evconnlistener *listener = NULL;
        int count = 1;
+       unsigned int flags = LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE;
+
+       if (data->setup_data && strstr((char*)data->setup_data, "ts")) {
+               flags |= LEV_OPT_THREADSAFE;
+       }
 
        /* send, so that pair[0] will look 'readable'*/
        send(data->pair[1], "hello", 5, 0);
 
        /* Start a listener with a bogus socket. */
        listener = evconnlistener_new(base, acceptcb, &count,
-           LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, 0,
+           flags, 0,
            data->pair[0]);
        tt_assert(listener);
 
@@ -166,7 +175,8 @@ regress_listener_error(void *arg)
        tt_int_op(count,==,1000); /* set by error cb */
 
 end:
-       evconnlistener_free(listener);
+       if (listener)
+               evconnlistener_free(listener);
 }
 
 struct testcase_t listener_testcases[] = {
@@ -174,10 +184,17 @@ struct testcase_t listener_testcases[] = {
        { "randport", regress_pick_a_port, TT_FORK|TT_NEED_BASE,
          &basic_setup, NULL},
 
+       { "randport_ts", regress_pick_a_port, TT_FORK|TT_NEED_BASE,
+         &basic_setup, (char*)"ts"},
+
        { "error", regress_listener_error,
          TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR,
          &basic_setup, NULL},
 
+       { "error_ts", regress_listener_error,
+         TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR,
+         &basic_setup, (char*)"ts"},
+
        END_OF_TESTCASES,
 };
 
@@ -187,7 +204,8 @@ struct testcase_t listener_iocp_testcases[] = {
          &basic_setup, NULL},
 
        { "error", regress_listener_error,
-         TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR|TT_ENABLE_IOCP,
+         TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR|TT_ENABLE_IOCP
+            |TT_SKIP/*Remove once err-handling on IOCP listeners is ok*/,
          &basic_setup, NULL},
 
        END_OF_TESTCASES,