]> granicus.if.org Git - libevent/commitdiff
Some IOCP bufferevent tweaks.
authorChristopher Davis <chrisd@torproject.org>
Tue, 17 Aug 2010 12:02:00 +0000 (05:02 -0700)
committerChristopher Davis <chrisd@torproject.org>
Wed, 8 Sep 2010 08:22:22 +0000 (01:22 -0700)
- Increment reference count of bufferevents before initiating overlapped
  operations to prevent the destructor from being called while operations
  are pending. The only portable way of canceling overlapped ops is to
  close the socket.

- Translate error codes to WSA* codes.

- Better handling of errors.

- Add an interface to add and del "virtual" events. Because IOCP
  bufferevents don't register any events with the base, the event loop
  has no way of knowing they exist. This causes the loop to terminate
  prematurely. event_base_{add,del}_virtual increment/decrement base's
  event count so the loop runs while there are any enabled IOCP
  bufferevents.

bufferevent_async.c
event-internal.h
event.c
event_iocp.c

index 23b636d20f147c014592b0b1d20732189f87f084..fa1bbc921112b93ef5515767e159dc514534e94e 100644 (file)
 #include <ws2tcpip.h>
 #endif
 
+#include <sys/queue.h>
+
 #include "event2/util.h"
 #include "event2/bufferevent.h"
 #include "event2/buffer.h"
 #include "event2/bufferevent_struct.h"
 #include "event2/event.h"
+#include "event2/util.h"
+#include "event-internal.h"
 #include "log-internal.h"
 #include "mm-internal.h"
 #include "bufferevent-internal.h"
@@ -74,6 +78,8 @@ struct bufferevent_async {
        unsigned read_in_progress : 1;
        unsigned write_in_progress : 1;
        unsigned ok : 1;
+       unsigned read_added : 1;
+       unsigned write_added : 1;
 };
 
 const struct bufferevent_ops bufferevent_ops_async = {
@@ -125,74 +131,143 @@ upcast_write(struct event_overlapped *eo)
 }
 
 static void
-bev_async_consider_writing(struct bufferevent_async *b)
+bev_async_del_write(struct bufferevent_async *beva)
+{
+       struct bufferevent *bev = &beva->bev.bev;
+
+       if (beva->write_added) {
+               beva->write_added = 0;
+               event_base_del_virtual(bev->ev_base);
+       }
+}
+
+static void
+bev_async_del_read(struct bufferevent_async *beva)
+{
+       struct bufferevent *bev = &beva->bev.bev;
+
+       if (beva->read_added) {
+               beva->read_added = 0;
+               event_base_del_virtual(bev->ev_base);
+       }
+}
+
+static void
+bev_async_add_write(struct bufferevent_async *beva)
+{
+       struct bufferevent *bev = &beva->bev.bev;
+
+       if (!beva->write_added) {
+               beva->write_added = 1;
+               event_base_add_virtual(bev->ev_base);
+       }
+}
+
+static void
+bev_async_add_read(struct bufferevent_async *beva)
+{
+       struct bufferevent *bev = &beva->bev.bev;
+
+       if (!beva->read_added) {
+               beva->read_added = 1;
+               event_base_add_virtual(bev->ev_base);
+       }
+}
+
+static void
+bev_async_consider_writing(struct bufferevent_async *beva)
 {
        size_t at_most;
        int limit;
+       struct bufferevent *bev = &beva->bev.bev;
+
        /* Don't write if there's a write in progress, or we do not
-        * want to write. */
-       if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
+        * want to write, or when there's nothing left to write. */
+       if (beva->write_in_progress)
                return;
-       /* Don't write if there's nothing to write */
-       if (!evbuffer_get_length(b->bev.bev.output))
+       if (!beva->ok || !(bev->enabled&EV_WRITE) ||
+           !evbuffer_get_length(bev->output)) {
+               bev_async_del_write(beva);
                return;
+       }
 
-       at_most = evbuffer_get_length(b->bev.bev.output);
+       at_most = evbuffer_get_length(bev->output);
 
        /* XXXX This over-commits. */
-       limit = _bufferevent_get_write_max(&b->bev);
+       limit = _bufferevent_get_write_max(&beva->bev);
        if (at_most >= limit)
                at_most = limit;
 
-       if (b->bev.write_suspended)
+       if (beva->bev.write_suspended) {
+               bev_async_del_write(beva);
                return;
+       }
 
        /*  XXXX doesn't respect low-water mark very well. */
-       if (evbuffer_launch_write(b->bev.bev.output, at_most,
-           &b->write_overlapped)) {
-               EVUTIL_ASSERT(0);/* XXX act sensibly. */
+       bufferevent_incref(bev);
+       if (evbuffer_launch_write(bev->output, at_most,
+           &beva->write_overlapped)) {
+               bufferevent_decref(bev);
+               beva->ok = 0;
+               _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
        } else {
-               b->write_in_progress = 1;
+               beva->write_in_progress = 1;
+               bev_async_add_write(beva);
        }
 }
 
 static void
-bev_async_consider_reading(struct bufferevent_async *b)
+bev_async_consider_reading(struct bufferevent_async *beva)
 {
        size_t cur_size;
        size_t read_high;
        size_t at_most;
        int limit;
+       struct bufferevent *bev = &beva->bev.bev;
+
        /* Don't read if there is a read in progress, or we do not
         * want to read. */
-       if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
+       if (beva->read_in_progress)
+               return;
+       if (!beva->ok || !(bev->enabled&EV_READ)) {
+               bev_async_del_read(beva);
                return;
+       }
 
        /* Don't read if we're full */
-       cur_size = evbuffer_get_length(b->bev.bev.input);
-       read_high = b->bev.bev.wm_read.high;
+       cur_size = evbuffer_get_length(bev->input);
+       read_high = bev->wm_read.high;
        if (read_high) {
-               if (cur_size >= read_high)
+               if (cur_size >= read_high) {
+                       bev_async_del_read(beva);
                        return;
+               }
                at_most = read_high - cur_size;
        } else {
                at_most = 16384; /* FIXME totally magic. */
        }
 
        /* XXXX This over-commits. */
-       limit = _bufferevent_get_read_max(&b->bev);
+       limit = _bufferevent_get_read_max(&beva->bev);
        if (at_most >= limit)
                at_most = limit;
 
-       if (b->bev.read_suspended)
+       if (beva->bev.read_suspended) {
+               bev_async_del_read(beva);
                return;
+       }
 
-       if (evbuffer_launch_read(b->bev.bev.input, at_most,
-           &b->read_overlapped)) {
-               EVUTIL_ASSERT(0);
+       bufferevent_incref(bev);
+       if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
+               beva->ok = 0;
+               bufferevent_decref(bev);
+               _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
        } else {
-               b->read_in_progress = 1;
+               beva->read_in_progress = 1;
+               bev_async_add_read(beva);
        }
+
+       return;
 }
 
 static void
@@ -260,14 +335,19 @@ be_async_enable(struct bufferevent *buf, short what)
 static int
 be_async_disable(struct bufferevent *bev, short what)
 {
+       struct bufferevent_async *bev_async = upcast(bev);
        /* XXXX If we disable reading or writing, we may want to consider
         * canceling any in-progress read or write operation, though it might
         * not work. */
 
-       if (what & EV_READ)
+       if (what & EV_READ) {
                BEV_DEL_GENERIC_READ_TIMEOUT(bev);
-       if (what & EV_WRITE)
+               bev_async_del_read(bev_async);
+       }
+       if (what & EV_WRITE) {
                BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+               bev_async_del_write(bev_async);
+       }
 
        return 0;
 }
@@ -275,18 +355,36 @@ be_async_disable(struct bufferevent *bev, short what)
 static void
 be_async_destruct(struct bufferevent *bev)
 {
+       struct bufferevent_async *bev_async = upcast(bev);
        struct bufferevent_private *bev_p = BEV_UPCAST(bev);
        evutil_socket_t fd;
 
-       EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
+       EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
+                       !upcast(bev)->read_in_progress);
+
+       bev_async_del_read(bev_async);
+       bev_async_del_write(bev_async);
 
-       /* XXX cancel any outstanding I/O operations */
        fd = _evbuffer_overlapped_get_fd(bev->input);
-       /* delete this in case non-blocking connect was used */
-       event_del(&bev->ev_write);
        if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
                evutil_closesocket(fd);
-       _bufferevent_del_generic_timeout_cbs(bev);
+       /* delete this in case non-blocking connect was used */
+       if (event_initialized(&bev->ev_write)) {
+               event_del(&bev->ev_write);
+               _bufferevent_del_generic_timeout_cbs(bev);
+       }
+}
+
+/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
+ * we use WSAGetOverlappedResult to translate. */
+static void
+bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
+{
+       DWORD bytes, flags;
+       evutil_socket_t fd;
+
+       fd = _evbuffer_overlapped_get_fd(bev->input);
+       WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
 }
 
 static int
@@ -303,15 +401,22 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
        struct bufferevent_async *bev_a = upcast_connect(eo);
        struct bufferevent *bev = &bev_a->bev.bev;
 
-       _bufferevent_incref_and_lock(bev);
+       BEV_LOCK(bev);
 
        EVUTIL_ASSERT(bev_a->bev.connecting);
        bev_a->bev.connecting = 0;
+       event_base_del_virtual(bev->ev_base);
+
+       if (ok)
+               bufferevent_async_set_connected(bev);
+       else
+               bev_async_set_wsa_error(bev, eo);
 
-       bufferevent_async_set_connected(bev);
        _bufferevent_run_eventcb(bev,
                        ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
 
+       event_base_del_virtual(bev->ev_base);
+
        _bufferevent_decref_and_unlock(bev);
 }
 
@@ -323,26 +428,32 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
        struct bufferevent *bev = &bev_a->bev.bev;
        short what = BEV_EVENT_READING;
 
-       _bufferevent_incref_and_lock(bev);
-       EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
+       BEV_LOCK(bev);
+       EVUTIL_ASSERT(bev_a->read_in_progress);
 
        evbuffer_commit_read(bev->input, nbytes);
        bev_a->read_in_progress = 0;
 
-       if (ok && nbytes) {
-               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
-               _bufferevent_decrement_read_buckets(&bev_a->bev, nbytes);
-               if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
-                       _bufferevent_run_readcb(bev);
-               bev_async_consider_reading(bev_a);
-       } else if (!ok) {
-               what |= BEV_EVENT_ERROR;
-               bev_a->ok = 0;
-               _bufferevent_run_eventcb(bev, what);
-       } else if (!nbytes) {
-               what |= BEV_EVENT_EOF;
-               bev_a->ok = 0;
-               _bufferevent_run_eventcb(bev, what);
+       if (!ok)
+               bev_async_set_wsa_error(bev, eo);
+
+       if (bev_a->ok) {
+               if (ok && nbytes) {
+                       BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+                       _bufferevent_decrement_read_buckets(&bev_a->bev,
+                                       nbytes);
+                       if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
+                               _bufferevent_run_readcb(bev);
+                       bev_async_consider_reading(bev_a);
+               } else if (!ok) {
+                       what |= BEV_EVENT_ERROR;
+                       bev_a->ok = 0;
+                       _bufferevent_run_eventcb(bev, what);
+               } else if (!nbytes) {
+                       what |= BEV_EVENT_EOF;
+                       bev_a->ok = 0;
+                       _bufferevent_run_eventcb(bev, what);
+               }
        }
 
        _bufferevent_decref_and_unlock(bev);
@@ -356,26 +467,32 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
        struct bufferevent *bev = &bev_a->bev.bev;
        short what = BEV_EVENT_WRITING;
 
-       _bufferevent_incref_and_lock(bev);
-       EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
-
+       BEV_LOCK(bev);
+       EVUTIL_ASSERT(bev_a->write_in_progress);
        evbuffer_commit_write(bev->output, nbytes);
        bev_a->write_in_progress = 0;
 
-       if (ok && nbytes) {
-               BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-               _bufferevent_decrement_write_buckets(&bev_a->bev, nbytes);
-               if (evbuffer_get_length(bev->output) <= bev->wm_write.low)
-                       _bufferevent_run_writecb(bev);
-               bev_async_consider_writing(bev_a);
-       } else if (!ok) {
-               what |= BEV_EVENT_ERROR;
-               bev_a->ok = 0;
-               _bufferevent_run_eventcb(bev, what);
-       } else if (!nbytes) {
-               what |= BEV_EVENT_EOF;
-               bev_a->ok = 0;
-               _bufferevent_run_eventcb(bev, what);
+       if (!ok)
+               bev_async_set_wsa_error(bev, eo);
+
+       if (bev_a->ok) {
+               if (ok && nbytes) {
+                       BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+                       _bufferevent_decrement_write_buckets(&bev_a->bev,
+                                       nbytes);
+                       if (evbuffer_get_length(bev->output) <=
+                           bev->wm_write.low)
+                               _bufferevent_run_writecb(bev);
+                       bev_async_consider_writing(bev_a);
+               } else if (!ok) {
+                       what |= BEV_EVENT_ERROR;
+                       bev_a->ok = 0;
+                       _bufferevent_run_eventcb(bev, what);
+               } else if (!nbytes) {
+                       what |= BEV_EVENT_EOF;
+                       bev_a->ok = 0;
+                       _bufferevent_run_eventcb(bev, what);
+               }
        }
 
        _bufferevent_decref_and_unlock(bev);
@@ -423,8 +540,6 @@ bufferevent_async_new(struct event_base *base,
 
        evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
        evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
-       evbuffer_defer_callbacks(bev->input, base);
-       evbuffer_defer_callbacks(bev->output, base);
 
        event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
        event_overlapped_init(&bev_a->read_overlapped, read_complete);
@@ -497,11 +612,16 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
            WSAGetLastError() != WSAEINVAL)
                return -1;
 
+       event_base_add_virtual(bev->ev_base);
+       bufferevent_incref(bev);
        rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
                            &bev_async->connect_overlapped.overlapped);
        if (rc || WSAGetLastError() == ERROR_IO_PENDING)
                return 0;
 
+       event_base_del_virtual(bev->ev_base);
+       bufferevent_decref(bev);
+
        return -1;
 }
 
index e52c129bc248ca673973a120286bcb4d6faa8235..7d97d98e3f27bf56a9b8e321cba1b76cfb978a75 100644 (file)
@@ -182,6 +182,8 @@ struct event_base {
        /** Data to implement the common signal handelr code. */
        struct evsig_info sig;
 
+       /** Number of virtual events */
+       int virtual_event_count;
        /** Number of total events added to this event_base */
        int event_count;
        /** Number of total events active in this event_base */
@@ -313,6 +315,10 @@ int _evsig_restore_handler(struct event_base *base, int evsignal);
 
 void event_active_nolock(struct event *ev, int res, short count);
 
+/* FIXME document. */
+void event_base_add_virtual(struct event_base *base);
+void event_base_del_virtual(struct event_base *base);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/event.c b/event.c
index 5f4656fbf64343e7b5371efdfd0dadd021b75df7..aaf8f28b03f95d7bfe0c485bdf7f442e3cb49fe8 100644 (file)
--- a/event.c
+++ b/event.c
@@ -962,7 +962,7 @@ static int
 event_haveevents(struct event_base *base)
 {
        /* Caller must hold th_base_lock */
-       return (base->event_count > 0);
+       return (base->virtual_event_count > 0 || base->event_count > 0);
 }
 
 /* "closure" function called when processing active signal events */
@@ -2707,3 +2707,19 @@ event_base_dump_events(struct event_base *base, FILE *output)
                }
        }
 }
+
+void
+event_base_add_virtual(struct event_base *base)
+{
+       EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+       base->virtual_event_count++;
+       EVBASE_RELEASE_LOCK(base, th_base_lock);
+}
+
+void
+event_base_del_virtual(struct event_base *base)
+{
+       EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+       base->virtual_event_count--;
+       EVBASE_RELEASE_LOCK(base, th_base_lock);
+}
index 82fa9aeeed45c998d8975ee529f874edba118b63..19c7bffc65edf7ea71285660ed86671d58d6b48a 100644 (file)
@@ -36,6 +36,7 @@
 #include "log-internal.h"
 #include "mm-internal.h"
 #include "event-internal.h"
+#include "evthread-internal.h"
 
 #define NOTIFICATION_KEY ((ULONG_PTR)-1)
 
@@ -277,4 +278,3 @@ event_base_get_iocp(struct event_base *base)
        return NULL;
 #endif
 }
-