]> granicus.if.org Git - libevent/commitdiff
Clean up acceptex code some more: add locking, single-threading, enable/disable.
authorNick Mathewson <nickm@torproject.org>
Mon, 2 Nov 2009 20:20:40 +0000 (20:20 +0000)
committerNick Mathewson <nickm@torproject.org>
Mon, 2 Nov 2009 20:20:40 +0000 (20:20 +0000)
svn:r1491

listener.c
test/regress_listener.c

index e14062543c6b269c2074a6ffdaefc67ae4c7a73a..553eb4f4ce5cac3b2ac763c778ae054e322776db 100644 (file)
@@ -55,6 +55,7 @@
 #include "log-internal.h"
 #ifdef WIN32
 #include "iocp-internal.h"
+#include "defer-internal.h"
 #endif
 
 struct evconnlistener_ops {
@@ -83,6 +84,7 @@ struct evconnlistener_iocp {
        evutil_socket_t fd;
        struct event_base *event_base;
        struct event_iocp_port *port;
+       CRITICAL_SECTION lock;
        int n_accepting;
        struct accepting_socket **accepting;
 };
@@ -289,6 +291,7 @@ struct accepting_socket {
        CRITICAL_SECTION lock;
        struct event_overlapped overlapped;
        SOCKET s;
+       struct deferred_cb deferred;
        struct evconnlistener_iocp *lev;
        ev_uint8_t buflen;
        ev_uint8_t family;
@@ -344,6 +347,7 @@ free_and_unlock_accepting_socket(struct accepting_socket *as)
 static int
 start_accepting(struct accepting_socket *as)
 {
+       /* requires lock */
        int result = -1;
        const struct win32_extension_fns *ext =
            event_get_win32_extension_fns();
@@ -386,61 +390,128 @@ done:
        return result;
 }
 
-#if 0
 static void
 stop_accepting(struct accepting_socket *as)
 {
-       /* XXX */
+       /* requires lock. */
+       SOCKET s = as->s;
+       as->s = INVALID_SOCKET;
+       closesocket(s);
 }
-#endif
 
 static void
-accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok)
+accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
 {
-       /* Run this whole thing deferred unless some MT flag is set */
-       /* XXX needs locking. */
-       /* XXX use ok */
-
-       struct sockaddr *sa_local=NULL, *sa_remote=NULL;
-       int socklen_local=0, socklen_remote=0;
-       struct accepting_socket *as =
-           EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
-       const struct win32_extension_fns *ext =
-           event_get_win32_extension_fns();
-       EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
+       struct *as = arg;
+       evconnlistener_cb cb;
 
+       EnterCriticalSection(&as->lock);
        ext->GetAcceptExSockaddrs(as->addrbuf, 0,
            as->buflen/2, as->buflen/2,
            &sa_local, &socklen_local,
            &sa_remote, &socklen_remote);
 
-       as->lev->base.cb(&as->lev->base, as->s, sa_remote, socklen_remote,
-           as->lev->base.user_data);
+       /* XXXX should we/can we release the lock here? */
+       as->lev->base.cb(&as->lev->base, as->s, sa_remote,
+           socklen_remote, as->lev->base.user_data);
 
        as->s = INVALID_SOCKET;
 
-       /* Avoid stack overflow XXXX */
-       start_accepting(as);
+       if (as->free_on_cb) {
+               free_and_unlock_accepting_socket(as);
+       } else {
+               start_accepting(as);/*XXX handle error */
+               LeaveCriticalSection(&as->lock);
+       }
 }
 
+static void
+accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok)
+{
+       struct sockaddr *sa_local=NULL, *sa_remote=NULL;
+       int socklen_local=0, socklen_remote=0;
+       struct accepting_socket *as =
+           EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
+       const struct win32_extension_fns *ext =
+           event_get_win32_extension_fns();
+       EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
+
+       EnterCriticalSection(&as->lock);
+       if (ok) {
+               /* XXXX Don't do this if some EV_MT flag is set. */
+               event_deferred_cb_schedule(
+                       event_base_get_deferred_cb_queue(as->lev->event_base),
+                       &as->deferred);
+               LeaveCriticalSection(&as->lock);
+       } else if (free_on_cb) {
+               free_and_unlock_accepting_socket(as);
+       } else if (as->s == INVALID_SOCKET) {
+               /* This is okay; we were disabled by iocp_listener_disable. */
+               LeaveCriticalSection(&as->lock);
+       } else {
+               /* Some error on accept that we couldn't actually handle. */
+               event_sock_warn(as->fd, "Unexpected error on AcceptEx");
+               LeaveCriticalSection(&as->lock);
+               /* XXXX recover better. */
+       }
+}
 
 static int
 iocp_listener_enable(struct evconnlistener *lev)
 {
-       /* XXXX */
+       int i;
+       struct evconnlistener_iocp *lev_iocp =
+           EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
+
+       EnterCriticalSection(&lev->lock);
+       for (i = 0; i < n_accepting; ++i) {
+               struct accepting_socket *as = lev_iocp->accepting[i];
+               if (!as)
+                       continue;
+               EnterCriticalSection(&as->lock);
+               if (!as->free_on_cb && as->s == INVALID_SOCKET)
+                       start_accepting(as); /* detect failure. */
+               LeaveCriticalSection(&as->lock);
+       }
+       LeaveCriticalSection(&lev->lock);
        return 0;
 }
+
 static int
-iocp_listener_disable(struct evconnlistener *lev)
+iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
 {
-       /* XXXX */
+       int i;
+       struct evconnlistener_iocp *lev_iocp =
+           EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
+
+       EnterCriticalSection(&lev->lock);
+       for (i = 0; i < n_accepting; ++i) {
+               struct accepting_socket *as = lev_iocp->accepting[i];
+               if (!as)
+                       continue;
+               EnterCriticalSection(&as->lock);
+               if (!as->free_on_cb && as->s != INVALID_SOCKET) {
+                       if (shutdown)
+                               as->free_on_cb = 1;
+                       stop_accepting(as);
+               }
+               LeaveCriticalSection(&as->lock);
+       }
+       LeaveCriticalSection(&lev->lock);
        return 0;
 }
+
+static int
+iocp_listener_disable_impl(struct evconnlistener *lev)
+{
+       return iocp_listener_disable_impl(lev,0);
+}
 static void
 iocp_listener_destroy(struct evconnlistener *lev)
 {
-       /* XXXX */
+       iocp_listener_disable_impl(lev,1);
 }
+
 static evutil_socket_t
 iocp_listener_getfd(struct evconnlistener *lev)
 {
@@ -521,6 +592,8 @@ evconnlistener_new_async(struct event_base *base,
                return NULL;
        }
 
+       InitializeCriticalSection(&lev->lock);
+
        if (start_accepting(lev->accepting[0]) < 0) {
                event_warnx("Couldn't start accepting on socket");
                EnterCriticalSection(&lev->accepting[0]->lock);
@@ -528,6 +601,7 @@ evconnlistener_new_async(struct event_base *base,
                mm_free(lev->accepting);
                mm_free(lev);
                closesocket(fd);
+               DeleteCriticalSection(&lev->lock);
                return NULL;
        }
 
index 264b0587eccb2d742beca8acba1df8eb5b1f5d69..050a91a59b40a50d110fdf07ae82d69b7bde8a77 100644 (file)
@@ -112,7 +112,9 @@ regress_pick_a_port(void *arg)
        evutil_socket_connect(&fd3, (struct sockaddr*)&ss2, slen2);
 
        event_base_dispatch(base);
-       // Sleep(2000);
+#ifdef WIN32
+       Sleep(1000); /* XXXX this is a stupid stopgap. */
+#endif
 
        tt_int_op(count1, ==, 0);
        tt_int_op(count2, ==, 0);