]> granicus.if.org Git - libevent/commitdiff
Move responsibility for IOCP callback into bufferevent_async.
authorNick Mathewson <nickm@torproject.org>
Tue, 17 Nov 2009 20:31:09 +0000 (20:31 +0000)
committerNick Mathewson <nickm@torproject.org>
Tue, 17 Nov 2009 20:31:09 +0000 (20:31 +0000)
This patch from Chris Davis saves some callback depth, and adds proper
ref-counting to bufferevents when there's a deferred evbuffer callback
inflight.  It could use a couple more comments to really nail down what
its invariants are.

svn:r1543

buffer.c
buffer_iocp.c
bufferevent.c
bufferevent_async.c
bufferevent_sock.c
evbuffer-internal.h
iocp-internal.h
test/regress_bufferevent.c
test/regress_iocp.c

index 8bd37d6e040acb35eb2082e045d08c19f12264b0..69810350fc04cd60d8c7285637e22f47615f184c 100644 (file)
--- a/buffer.c
+++ b/buffer.c
@@ -78,6 +78,9 @@
 #include "event2/event.h"
 #include "event2/buffer.h"
 #include "event2/buffer_compat.h"
+#include "event2/bufferevent.h"
+#include "event2/bufferevent_compat.h"
+#include "event2/bufferevent_struct.h"
 #include "event2/thread.h"
 #include "event-config.h"
 #include "log-internal.h"
@@ -85,6 +88,7 @@
 #include "util-internal.h"
 #include "evthread-internal.h"
 #include "evbuffer-internal.h"
+#include "bufferevent-internal.h"
 
 /* some systems do not have MAP_FAILED */
 #ifndef MAP_FAILED
@@ -276,6 +280,13 @@ _evbuffer_incref(struct evbuffer *buf)
        EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
 }
 
+void
+_evbuffer_incref_and_lock(struct evbuffer *buf)
+{
+       EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+       ++buf->refcnt;
+}
+
 int
 evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
 {
@@ -312,6 +323,14 @@ evbuffer_enable_locking(struct evbuffer *buf, void *lock)
 #endif
 }
 
+void
+evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev)
+{
+       EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+       buf->parent = bev;
+       EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+}
+
 static void
 evbuffer_run_callbacks(struct evbuffer *buffer)
 {
@@ -362,7 +381,10 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
        if (buffer->deferred_cbs) {
                if (buffer->deferred.queued)
                        return;
-               _evbuffer_incref(buffer);
+               _evbuffer_incref_and_lock(buffer);
+               if (buffer->parent)
+                       bufferevent_incref(buffer->parent);
+               EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
                event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred);
        } else {
                evbuffer_run_callbacks(buffer);
@@ -372,13 +394,17 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
 static void
 evbuffer_deferred_callback(struct deferred_cb *cb, void *arg)
 {
+       struct bufferevent *parent = NULL;
        struct evbuffer *buffer = arg;
 
        /* XXXX It would be better to run these callbacks without holding the
         * lock */
        EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
+       parent = buffer->parent;
        evbuffer_run_callbacks(buffer);
        _evbuffer_decref_and_unlock(buffer);
+       if (parent)
+               bufferevent_free(parent);
 }
 
 static void
index 2b346d3615b51f8ebf87308d279a20d46c71b2bc..7529061070f4aa2f51e9f13d65d59cca96822fb3 100644 (file)
 
 #define MAX_WSABUFS 16
 
-/** Wrapper for an OVERLAPPED that holds the necessary info to notice
-    when an overlapped read or write is done on an evbuffer.
- **/
-struct buffer_overlapped {
-       struct event_overlapped event_overlapped;
-
-       /** The first pinned chain in the buffer. */
-       struct evbuffer_chain *first_pinned;
-       /** The buffer itself. */
-       struct evbuffer_overlapped *buf;
-       /** How many chains are pinned; how many of the fields in buffers
-        * are we using. */
-       int n_buffers;
-       WSABUF buffers[MAX_WSABUFS];
-};
-
 /** An evbuffer that can handle overlapped IO. */
 struct evbuffer_overlapped {
        struct evbuffer buffer;
        /** The socket that we're doing overlapped IO on. */
        evutil_socket_t fd;
-       /** True iff we have scheduled a write. */
-       unsigned write_in_progress : 1;
-       /** True iff we have scheduled a read. */
+
+       /** pending I/O type */
        unsigned read_in_progress : 1;
+       unsigned write_in_progress : 1;
+
+       /** The first pinned chain in the buffer. */
+       struct evbuffer_chain *first_pinned;
 
-       struct buffer_overlapped read_info;
-       struct buffer_overlapped write_info;
+       /** How many chains are pinned; how many of the fields in buffers
+        * are we using. */
+       int n_buffers;
+       WSABUF buffers[MAX_WSABUFS];
 };
 
 /** Given an evbuffer, return the correponding evbuffer structure, or NULL if
@@ -88,52 +77,40 @@ upcast_evbuffer(struct evbuffer *buf)
        return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer);
 }
 
-static inline struct buffer_overlapped *
-upcast_overlapped(struct event_overlapped *o)
-{
-       return EVUTIL_UPCAST(o, struct buffer_overlapped, event_overlapped);
-}
-
 /** Unpin all the chains noted as pinned in 'eo'. */
 static void
-pin_release(struct event_overlapped *eo, unsigned flag)
+pin_release(struct evbuffer_overlapped *eo, unsigned flag)
 {
        int i;
-       struct buffer_overlapped *bo = upcast_overlapped(eo);
-       struct evbuffer_chain *chain = bo->first_pinned;
+       struct evbuffer_chain *chain = eo->first_pinned;
 
-       for (i = 0; i < bo->n_buffers; ++i) {
+       for (i = 0; i < eo->n_buffers; ++i) {
                EVUTIL_ASSERT(chain);
                _evbuffer_chain_unpin(chain, flag);
                chain = chain->next;
        }
 }
 
-/** IOCP callback invoked when a read operation is finished. */
-static void
-read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok)
+void
+evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
 {
-       struct buffer_overlapped *buf_o = upcast_overlapped(eo);
-       struct evbuffer_overlapped *buf = buf_o->buf;
-       struct evbuffer *evbuf = &buf->buffer;
-
+       struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
        struct evbuffer_iovec iov[2];
        int n_vec;
 
-       // XXXX use ok
+       EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
+       EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
        EVUTIL_ASSERT(nBytes >= 0); // XXXX Can this be false?
 
-       EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
-       buf->read_in_progress = 0;
        evbuffer_unfreeze(evbuf, 0);
 
-       iov[0].iov_base = buf_o->buffers[0].buf;
-       if ((size_t)nBytes <= buf_o->buffers[0].len) {
+       iov[0].iov_base = buf->buffers[0].buf;
+       if ((size_t)nBytes <= buf->buffers[0].len) {
                iov[0].iov_len = nBytes;
                n_vec = 1;
        } else {
-               iov[0].iov_len = buf_o->buffers[0].len;
-               iov[1].iov_base = buf_o->buffers[1].buf;
+               iov[0].iov_len = buf->buffers[0].len;
+               iov[1].iov_base = buf->buffers[1].buf;
                iov[1].iov_len = nBytes - iov[0].iov_len;
                n_vec = 2;
        }
@@ -141,26 +118,24 @@ read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int
        if (evbuffer_commit_space(evbuf, iov, n_vec) < 0)
                EVUTIL_ASSERT(0); /* XXXX fail nicer. */
 
-       pin_release(eo, EVBUFFER_MEM_PINNED_R);
+       pin_release(buf, EVBUFFER_MEM_PINNED_R);
+
+       buf->read_in_progress = 0;
 
        _evbuffer_decref_and_unlock(evbuf);
 }
 
-/** IOCP callback invoked when a write operation is finished. */
-static void
-write_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok)
+void
+evbuffer_commit_write(struct evbuffer *evbuf, ev_ssize_t nBytes)
 {
-       // XXX use ok
-       struct buffer_overlapped *buf_o = upcast_overlapped(eo);
-       struct evbuffer_overlapped *buf = buf_o->buf;
-
-       struct evbuffer *evbuf = &buf->buffer;
+       struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
 
        EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
-       buf->write_in_progress = 0;
+       EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
        evbuffer_unfreeze(evbuf, 1);
        evbuffer_drain(evbuf, nBytes);
-       pin_release(eo,EVBUFFER_MEM_PINNED_W);
+       pin_release(buf,EVBUFFER_MEM_PINNED_W);
+       buf->write_in_progress = 0;
        _evbuffer_decref_and_unlock(evbuf);
 }
 
@@ -181,7 +156,8 @@ evbuffer_overlapped_new(evutil_socket_t fd)
 }
 
 int
-evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
+evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
+               struct event_overlapped *ol)
 {
        struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
        int r = -1;
@@ -195,6 +171,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
        }
 
        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+       EVUTIL_ASSERT(!buf_o->read_in_progress);
        if (buf->freeze_start || buf_o->write_in_progress)
                goto done;
        if (!buf->total_len) {
@@ -206,14 +183,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
        }
        evbuffer_freeze(buf, 1);
 
-       /* XXX we could move much of this into the constructor. */
-       memset(&buf_o->write_info, 0, sizeof(buf_o->write_info));
-       buf_o->write_info.buf = buf_o;
-       buf_o->write_info.event_overlapped.cb = write_completed;
-       chain = buf_o->write_info.first_pinned = buf->first;
+       buf_o->first_pinned = 0;
+       buf_o->n_buffers = 0;
+       memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
+
+       chain = buf_o->first_pinned = buf->first;
 
        for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
-               WSABUF *b = &buf_o->write_info.buffers[i];
+               WSABUF *b = &buf_o->buffers[i];
                b->buf = chain->buffer + chain->misalign;
                _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W);
 
@@ -227,14 +204,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
                }
        }
 
-       buf_o->write_info.n_buffers = i;
+       buf_o->n_buffers = i;
        _evbuffer_incref(buf);
-       if (WSASend(buf_o->fd, buf_o->write_info.buffers, i, &bytesSent, 0,
-               &buf_o->write_info.event_overlapped.overlapped, NULL)) {
+       if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
+               &ol->overlapped, NULL)) {
                int error = WSAGetLastError();
                if (error != WSA_IO_PENDING) {
                        /* An actual error. */
-                       pin_release(&buf_o->write_info.event_overlapped, EVBUFFER_MEM_PINNED_W);
+                       pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
                        evbuffer_unfreeze(buf, 1);
                        evbuffer_free(buf); /* decref */
                        goto done;
@@ -249,7 +226,8 @@ done:
 }
 
 int
-evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
+evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
+               struct event_overlapped *ol)
 {
        struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
        int r = -1, i;
@@ -263,28 +241,28 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
        if (!buf_o)
                return -1;
        EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+       EVUTIL_ASSERT(!buf_o->write_in_progress);
        if (buf->freeze_end || buf_o->read_in_progress)
                goto done;
 
+       buf_o->first_pinned = 0;
+       buf_o->n_buffers = 0;
+       memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
+
        if (_evbuffer_expand_fast(buf, at_most) == -1)
                goto done;
        evbuffer_freeze(buf, 0);
 
-       /* XXX we could move much of this into the constructor. */
-       memset(&buf_o->read_info, 0, sizeof(buf_o->read_info));
-       buf_o->read_info.buf = buf_o;
-       buf_o->read_info.event_overlapped.cb = read_completed;
-
        nvecs = _evbuffer_read_setup_vecs(buf, at_most,
            vecs, &chain, 1);
        for (i=0;i<nvecs;++i) {
                WSABUF_FROM_EVBUFFER_IOV(
-                       &buf_o->read_info.buffers[i],
+                       &buf_o->buffers[i],
                        &vecs[i]);
        }
 
-       buf_o->read_info.n_buffers = nvecs;
-       buf_o->read_info.first_pinned = chain;
+       buf_o->n_buffers = nvecs;
+       buf_o->first_pinned = chain;
        npin=0;
        for ( ; chain; chain = chain->next) {
                _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R);
@@ -293,11 +271,12 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
        EVUTIL_ASSERT(npin == nvecs);
 
        _evbuffer_incref(buf);
-       if (WSARecv(buf_o->fd, buf_o->read_info.buffers, nvecs, &bytesRead, &flags, &buf_o->read_info.event_overlapped.overlapped, NULL)) {
+       if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
+                   &ol->overlapped, NULL)) {
                int error = WSAGetLastError();
                if (error != WSA_IO_PENDING) {
                        /* An actual error. */
-                       pin_release(&buf_o->read_info.event_overlapped, EVBUFFER_MEM_PINNED_R);
+                       pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
                        evbuffer_unfreeze(buf, 0);
                        evbuffer_free(buf); /* decref */
                        goto done;
index f291f29fe459ee72b3a8f45725d7b110497b093a..39a062c1e3b0010c4efe4db2ba36134241944f38 100644 (file)
 #include <errno.h>
 
 #include "event2/util.h"
-#include "event2/bufferevent.h"
 #include "event2/buffer.h"
+#include "event2/buffer_compat.h"
+#include "event2/bufferevent.h"
 #include "event2/bufferevent_struct.h"
 #include "event2/bufferevent_compat.h"
 #include "event2/event.h"
 #include "log-internal.h"
 #include "mm-internal.h"
 #include "bufferevent-internal.h"
+#include "evbuffer-internal.h"
 #include "util-internal.h"
 
 void
@@ -257,6 +259,9 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
 
        bufev_private->options = options;
 
+       evbuffer_set_parent(bufev->input, bufev);
+       evbuffer_set_parent(bufev->output, bufev);
+
        return 0;
 }
 
@@ -494,6 +499,9 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev)
        if (bufev->be_ops->destruct)
                bufev->be_ops->destruct(bufev);
 
+       /* XXX what happens if refcnt for these buffers is > 1?
+        * The buffers can share a lock with this bufferevent object,
+        * but the lock might be destroyed below. */
        /* evbuffer will free the callbacks */
        evbuffer_free(bufev->input);
        evbuffer_free(bufev->output);
@@ -631,7 +639,7 @@ _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
 {
        evtimer_assign(&bev->ev_read, bev->ev_base,
            bufferevent_generic_read_timeout_cb, bev);
-       evtimer_assign(&bev->ev_read, bev->ev_base,
+       evtimer_assign(&bev->ev_write, bev->ev_base,
            bufferevent_generic_write_timeout_cb, bev);
 }
 
index d34051ce753afdf3e5b4a4387a6008f3dfcf01b0..a8e92b70ee60932339a686f5129d74b465766b00 100644 (file)
@@ -80,8 +80,11 @@ const struct bufferevent_ops bufferevent_ops_async = {
 struct bufferevent_async {
        struct bufferevent_private bev;
        struct event_overlapped connect_overlapped;
+       struct event_overlapped read_overlapped;
+       struct event_overlapped write_overlapped;
        unsigned read_in_progress : 1;
        unsigned write_in_progress : 1;
+       unsigned ok : 1;
 };
 
 static inline struct bufferevent_async *
@@ -91,16 +94,33 @@ upcast(struct bufferevent *bev)
        if (bev->be_ops != &bufferevent_ops_async)
                return NULL;
        bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
-       EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
        return bev_a;
 }
 
 static inline struct bufferevent_async *
-upcast_overlapped(struct event_overlapped *eo)
+upcast_connect(struct event_overlapped *eo)
 {
        struct bufferevent_async *bev_a;
        bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
-       EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
+       EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
+       return bev_a;
+}
+
+static inline struct bufferevent_async *
+upcast_read(struct event_overlapped *eo)
+{
+       struct bufferevent_async *bev_a;
+       bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
+       EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
+       return bev_a;
+}
+
+static inline struct bufferevent_async *
+upcast_write(struct event_overlapped *eo)
+{
+       struct bufferevent_async *bev_a;
+       bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
+       EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
        return bev_a;
 }
 
@@ -109,14 +129,15 @@ bev_async_consider_writing(struct bufferevent_async *b)
 {
        /* Don't write if there's a write in progress, or we do not
         * want to write. */
-       if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
+       if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
                return;
        /* Don't write if there's nothing to write */
        if (!evbuffer_get_length(b->bev.bev.output))
                return;
 
        /*  XXXX doesn't respect low-water mark very well. */
-       if (evbuffer_launch_write(b->bev.bev.output, -1)) {
+       if (evbuffer_launch_write(b->bev.bev.output, -1,
+           &b->write_overlapped)) {
                EVUTIL_ASSERT(0);/* XXX act sensibly. */
        } else {
                b->write_in_progress = 1;
@@ -131,7 +152,7 @@ bev_async_consider_reading(struct bufferevent_async *b)
        size_t at_most;
        /* Don't read if there is a read in progress, or we do not
         * want to read. */
-       if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
+       if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
                return;
 
        /* Don't read if we're full */
@@ -145,7 +166,8 @@ bev_async_consider_reading(struct bufferevent_async *b)
                at_most = 16384; /* FIXME totally magic. */
        }
 
-       if (evbuffer_launch_read(b->bev.bev.input, at_most)) {
+       if (evbuffer_launch_read(b->bev.bev.input, at_most,
+           &b->read_overlapped)) {
                EVUTIL_ASSERT(0);
        } else {
                b->read_in_progress = 1;
@@ -159,26 +181,15 @@ be_async_outbuf_callback(struct evbuffer *buf,
 {
        struct bufferevent *bev = arg;
        struct bufferevent_async *bev_async = upcast(bev);
-       /* If we successfully wrote from the outbuf, or we added data to the
-        * outbuf and were not writing before, we may want to write now. */
+
+       /* If we added data to the outbuf and were not writing before, 
+        * we may want to write now. */
 
        _bufferevent_incref_and_lock(bev);
-       if (cbinfo->n_deleted) {
-               /* XXXX can't detect 0-length write completion */
-               bev_async->write_in_progress = 0;
-       }
 
-       if (cbinfo->n_added || cbinfo->n_deleted)
+       if (cbinfo->n_added)
                bev_async_consider_writing(bev_async);
 
-       if (cbinfo->n_deleted) {
-               BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-
-               if (bev->writecb != NULL &&
-                   evbuffer_get_length(bev->output) <= bev->wm_write.low)
-                       _bufferevent_run_writecb(bev);
-       }
-
        _bufferevent_decref_and_unlock(bev);
 }
 
@@ -190,26 +201,14 @@ be_async_inbuf_callback(struct evbuffer *buf,
        struct bufferevent *bev = arg;
        struct bufferevent_async *bev_async = upcast(bev);
 
-       /* If we successfully read into the inbuf, or we drained data from
-        * the inbuf and were not reading before, we may want to read now */
+       /* If we drained data from the inbuf and were not reading before,
+        * we may want to read now */
 
        _bufferevent_incref_and_lock(bev);
-       if (cbinfo->n_added) {
-               /* XXXX can't detect 0-length read completion */
-               bev_async->read_in_progress = 0;
-       }
 
-       if (cbinfo->n_added || cbinfo->n_deleted)
+       if (cbinfo->n_deleted)
                bev_async_consider_reading(bev_async);
 
-       if (cbinfo->n_added) {
-               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
-
-               if (evbuffer_get_length(bev->input) >= bev->wm_read.low &&
-                   bev->readcb != NULL)
-                       _bufferevent_run_readcb(bev);
-       }
-
        _bufferevent_decref_and_unlock(bev);
 }
 
@@ -218,6 +217,10 @@ be_async_enable(struct bufferevent *buf, short what)
 {
        struct bufferevent_async *bev_async = upcast(buf);
 
+       if (!bev_async->ok)
+               return -1;
+
+       /* NOTE: This interferes with non-blocking connect */
        _bufferevent_generic_adj_timeouts(buf);
 
        /* If we newly enable reading or writing, and we aren't reading or
@@ -245,6 +248,17 @@ be_async_disable(struct bufferevent *bev, short what)
 static void
 be_async_destruct(struct bufferevent *bev)
 {
+       struct bufferevent_private *bev_p = BEV_UPCAST(bev);
+       evutil_socket_t fd;
+
+       EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
+
+       /* XXX cancel any outstanding I/O operations */
+       fd = _evbuffer_overlapped_get_fd(bev->input);
+       /* delete this in case non-blocking connect was used */
+       event_del(&bev->ev_write);
+       if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
+               EVUTIL_CLOSESOCKET(fd);
        _bufferevent_del_generic_timeout_cbs(bev);
 }
 
@@ -259,20 +273,87 @@ static void
 connect_complete(struct event_overlapped *eo, uintptr_t key,
     ev_ssize_t nbytes, int ok)
 {
-       struct bufferevent_async *bev_a = upcast_overlapped(eo);
-       struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
+       struct bufferevent_async *bev_a = upcast_connect(eo);
+       struct bufferevent *bev = &bev_a->bev.bev;
 
        _bufferevent_incref_and_lock(bev);
 
        EVUTIL_ASSERT(bev_a->bev.connecting);
        bev_a->bev.connecting = 0;
 
+       bufferevent_async_set_connected(bev);
        _bufferevent_run_eventcb(bev,
                        ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
 
        _bufferevent_decref_and_unlock(bev);
 }
 
+static void
+read_complete(struct event_overlapped *eo, uintptr_t key,
+    ev_ssize_t nbytes, int ok)
+{
+       struct bufferevent_async *bev_a = upcast_read(eo);
+       struct bufferevent *bev = &bev_a->bev.bev;
+       short what = BEV_EVENT_READING;
+
+       _bufferevent_incref_and_lock(bev);
+       EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
+
+       evbuffer_commit_read(bev->input, nbytes);
+       bev_a->read_in_progress = 0;
+
+       if (ok && nbytes) {
+               BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+               if (bev->readcb != NULL &&
+                   evbuffer_get_length(bev->input) >= bev->wm_read.low)
+                       _bufferevent_run_readcb(bev);
+               bev_async_consider_reading(bev_a);
+       } else if (!ok) {
+               what |= BEV_EVENT_ERROR;
+               bev_a->ok = 0;
+               _bufferevent_run_eventcb(bev, what);
+       } else if (!nbytes) {
+               what |= BEV_EVENT_EOF;
+               bev_a->ok = 0;
+               _bufferevent_run_eventcb(bev, what);
+       }
+
+       _bufferevent_decref_and_unlock(bev);
+}
+
+static void
+write_complete(struct event_overlapped *eo, uintptr_t key,
+    ev_ssize_t nbytes, int ok)
+{
+       struct bufferevent_async *bev_a = upcast_write(eo);
+       struct bufferevent *bev = &bev_a->bev.bev;
+       short what = BEV_EVENT_WRITING;
+
+       _bufferevent_incref_and_lock(bev);
+       EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
+
+       evbuffer_commit_write(bev->output, nbytes);
+       bev_a->write_in_progress = 0;
+
+       if (ok && nbytes) {
+               BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+               if (bev->writecb != NULL && 
+                   evbuffer_get_length(bev->output) <= bev->wm_write.low)
+                       _bufferevent_run_writecb(bev);
+               bev_async_consider_writing(bev_a);
+       } else if (!ok) {
+               what |= BEV_EVENT_ERROR;
+               bev_a->ok = 0;
+               _bufferevent_run_eventcb(bev, what);
+       } else if (!nbytes) {
+               what |= BEV_EVENT_EOF;
+               bev_a->ok = 0;
+               _bufferevent_run_eventcb(bev, what);
+       }
+
+       _bufferevent_decref_and_unlock(bev);
+}
+
 struct bufferevent *
 bufferevent_async_new(struct event_base *base,
     evutil_socket_t fd, int options)
@@ -318,10 +399,11 @@ bufferevent_async_new(struct event_base *base,
        evbuffer_defer_callbacks(bev->input, base);
        evbuffer_defer_callbacks(bev->output, base);
 
-       evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
-       _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
-
        event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
+       event_overlapped_init(&bev_a->read_overlapped, read_complete);
+       event_overlapped_init(&bev_a->write_overlapped, write_complete);
+
+       bev_a->ok = fd >= 0;
 
        return bev;
 err:
@@ -329,6 +411,16 @@ err:
        return NULL;
 }
 
+void
+bufferevent_async_set_connected(struct bufferevent *bev)
+{
+       struct bufferevent_async *bev_async = upcast(bev);
+       bev_async->ok = 1;
+       _bufferevent_init_generic_timeout_cbs(bev);
+       /* Now's a good time to consider reading/writing */
+       be_async_enable(bev, bev->enabled);
+}
+
 int
 bufferevent_async_can_connect(struct bufferevent *bev)
 {
@@ -369,7 +461,7 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
                sin6->sin6_family = AF_INET6;
                sin6->sin6_addr = in6addr_any;
        } else {
-               /* XXX: what to do? */
+               /* Well, the user will have to bind() */
                return -1;
        }
        if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
index 5477d074f9da61c9ca6a6ff525ca15c556bab7a0..1ae367b1b0f9e27ad10a9fbcc09d2d76fbe11c35 100644 (file)
@@ -212,8 +212,18 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
                        goto done;
                } else {
                        connected = 1;
-                       _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
-                       if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) {
+#ifdef WIN32
+                       if (BEV_IS_ASYNC(bufev)) {
+                               event_del(&bufev->ev_write);
+                               bufferevent_async_set_connected(bufev);
+                               _bufferevent_run_eventcb(bufev,
+                                               BEV_EVENT_CONNECTED);
+                               goto done;
+                       }
+#endif
+                       _bufferevent_run_eventcb(bufev,
+                                       BEV_EVENT_CONNECTED);
+                       if (!(bufev->enabled & EV_WRITE)) {
                                event_del(&bufev->ev_write);
                                goto done;
                        }
index d0129804896b84d065d853459abf413b8ead9d66..4499fec1f897f7ed44a8bcc42a82a3540d6e3cd8 100644 (file)
@@ -66,6 +66,7 @@ struct evbuffer_cb_entry {
 #endif
 };
 
+struct bufferevent;
 struct evbuffer_chain;
 struct evbuffer {
        /** The first chain in this buffer's linked list of chains. */
@@ -135,6 +136,10 @@ struct evbuffer {
 
        /** A doubly-linked-list of callback functions */
        TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
+
+       /** The parent bufferevent object this evbuffer belongs to.
+        * NULL if the evbuffer stands alone. */
+       struct bufferevent *parent;
 };
 
 /** A single item in an evbuffer. */
@@ -245,6 +250,8 @@ struct evbuffer_chain_reference {
 
 /** Increase the reference count of buf by one. */
 void _evbuffer_incref(struct evbuffer *buf);
+/** Increase the reference count of buf by one and acquire the lock. */
+void _evbuffer_incref_and_lock(struct evbuffer *buf);
 /** Pin a single buffer chain using a given flag. A pinned chunk may not be
  * moved or freed until it is unpinned. */
 void _evbuffer_chain_pin(struct evbuffer_chain *chain, unsigned flag);
@@ -273,6 +280,9 @@ int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
                (i)->len = (ei)->iov_len;               \
        } while(0)
 
+/** Set the parent bufferevent object for buf to bev */
+void evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev);
+
 #ifdef __cplusplus
 }
 #endif
index c0500ace0b10b2a031a821a267a9d73c793695d4..0f8d3c81089f64698d642d29d3d63d1da7cb00d6 100644 (file)
@@ -124,24 +124,32 @@ void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd);
     An evbuffer can only have one read pending at a time.  While the read
     is in progress, no other data may be added to the end of the buffer.
     The buffer must be created with event_overlapped_init().
+    evbuffer_commit_read() must be called in the completion callback. 
 
     @param buf The buffer to read onto
     @param n The number of bytes to try to read.
+    @param ol Overlapped object with associated completion callback.
     @return 0 on success, -1 on error.
  */
-int evbuffer_launch_read(struct evbuffer *, size_t n);
+int evbuffer_launch_read(struct evbuffer *buf, size_t n, struct event_overlapped *ol);
 
 /** Start writing data from the start of an evbuffer.
 
     An evbuffer can only have one write pending at a time.  While the write is
     in progress, no other data may be removed from the front of the buffer.
     The buffer must be created with event_overlapped_init().
+    evbuffer_commit_write() must be called in the completion callback. 
 
     @param buf The buffer to read onto
     @param n The number of bytes to try to read.
+    @param ol Overlapped object with associated completion callback.
     @return 0 on success, -1 on error.
  */
-int evbuffer_launch_write(struct evbuffer *, ev_ssize_t n);
+int evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t n, struct event_overlapped *ol);
+
+/** XXX document */
+void evbuffer_commit_read(struct evbuffer *, ev_ssize_t);
+void evbuffer_commit_write(struct evbuffer *, ev_ssize_t);
 
 /** Create an IOCP, and launch its worker threads.  Internal use only.
 
@@ -179,6 +187,7 @@ struct bufferevent *bufferevent_async_new(struct event_base *base,
     evutil_socket_t fd, int options);
 
 /* FIXME document. */
+void bufferevent_async_set_connected(struct bufferevent *bev);
 int bufferevent_async_can_connect(struct bufferevent *bev);
 int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
        const struct sockaddr *sa, int socklen);
index 334261de718d1632f16163c1398c031c4a45b7fc..dc9e17f91c0c3fe4535749fe2c4ea3be8d57f483 100644 (file)
@@ -497,6 +497,13 @@ test_bufferevent_connect(void *arg)
        bufferevent_enable(bev1, EV_READ);
        bufferevent_enable(bev2, EV_READ);
 
+#ifdef WIN32
+       /* FIXME this is to get IOCP to work. it shouldn't be required. */
+       {
+               struct timeval tv = {5000,0};
+               event_base_loopexit(data->base, &tv);
+       }
+#endif
        event_base_dispatch(data->base);
 
        tt_int_op(n_strings_read, ==, 2);
@@ -580,6 +587,13 @@ test_bufferevent_connect_fail(void *arg)
        event_add(&close_listener_event, &one_second);
        close_listener_event_added = 1;
 
+#ifdef WIN32
+       /* FIXME this is to get IOCP to work. it shouldn't be required. */
+       {
+               struct timeval tv = {5000,0};
+               event_base_loopexit(data->base, &tv);
+       }
+#endif
        event_base_dispatch(data->base);
 
        tt_int_op(test_ok, ==, 1);
@@ -628,7 +642,6 @@ struct testcase_t bufferevent_iocp_testcases[] = {
         LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP),
         LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
         LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
-#if 0
        { "bufferevent_connect", test_bufferevent_connect,
          TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
        { "bufferevent_connect_defer", test_bufferevent_connect,
@@ -639,14 +652,11 @@ struct testcase_t bufferevent_iocp_testcases[] = {
        { "bufferevent_connect_lock_defer", test_bufferevent_connect,
          TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
          (void*)"defer lock" },
-#endif
        { "bufferevent_connect_fail", test_bufferevent_connect_fail,
          TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
-#if 0
        { "bufferevent_connect_nonblocking", test_bufferevent_connect,
          TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
          (void*)"unset_connectex" },
-#endif
 
         END_OF_TESTCASES,
 };
index f1e9af6e0ad6d5c10583ae5ddf36195a9d05ea46..81866dfe8fcd0674622bac745cd9dd5568938dba 100644 (file)
@@ -152,15 +152,40 @@ end:
        ;
 }
 
+static struct evbuffer *rbuf = NULL, *wbuf = NULL;
+
+static void
+read_complete(struct event_overlapped *eo, uintptr_t key,
+    ev_ssize_t nbytes, int ok)
+{
+       tt_assert(ok);
+       evbuffer_commit_read(rbuf, nbytes);
+end:
+       ;
+}
+
+static void
+write_complete(struct event_overlapped *eo, uintptr_t key,
+    ev_ssize_t nbytes, int ok)
+{
+       tt_assert(ok);
+       evbuffer_commit_write(wbuf, nbytes);
+end:
+       ;
+}
+
 static void
 test_iocp_evbuffer(void *ptr)
 {
+       struct event_overlapped rol, wol;
        struct basic_test_data *data = ptr;
        struct event_iocp_port *port = NULL;
-       struct evbuffer *rbuf = NULL, *wbuf = NULL;
        char junk[1024];
        int i;
 
+       event_overlapped_init(&rol, read_complete);
+       event_overlapped_init(&wol, write_complete);
+
 #ifdef WIN32
        evthread_use_windows_threads();
 #endif
@@ -185,8 +210,8 @@ test_iocp_evbuffer(void *ptr)
                evbuffer_add(wbuf, junk, sizeof(junk));
 
        tt_assert(!evbuffer_get_length(rbuf));
-       tt_assert(!evbuffer_launch_write(wbuf, 512));
-       tt_assert(!evbuffer_launch_read(rbuf, 2048));
+       tt_assert(!evbuffer_launch_write(wbuf, 512, &wol));
+       tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol));
 
 #ifdef WIN32
        /* FIXME this is stupid. */