]> granicus.if.org Git - libevent/commitdiff
Add a linked-pair abstraction to bufferevents.
authorNick Mathewson <nickm@torproject.org>
Fri, 10 Apr 2009 15:01:31 +0000 (15:01 +0000)
committerNick Mathewson <nickm@torproject.org>
Fri, 10 Apr 2009 15:01:31 +0000 (15:01 +0000)
The new bufferevent_pair abstraction works like a set of buferevent_sockets
connected by a socketpair, except that it doesn't require a socketpair,
and therefore doesn't need to get the kernel involved.

It's also a good way to make sure that deferred callbacks work.  It's a good
use case for deferred callbacks: before I implemented them, the recursive
relationship between the evbuffer callback and the read callback would
make the unit tests overflow the stack.

svn:r1152

Makefile.am
bufferevent-internal.h
bufferevent_filter.c
bufferevent_pair.c [new file with mode: 0644]
bufferevent_sock.c
include/event2/bufferevent.h
test/regress_bufferevent.c

index 13d1354616b220e5a43ff2909415a61d3db80721..1c7cb87ebeb07d9b0ca7df47c69e7ea9f3b28c4f 100644 (file)
@@ -90,6 +90,7 @@ event-config.h: config.h
 
 CORE_SRC = event.c buffer.c \
        bufferevent.c bufferevent_sock.c bufferevent_filter.c \
+       bufferevent_pair.c \
        evmap.c log.c evutil.c strlcpy.c $(SYS_SRC)
 EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c
 
index a08dfe869ce08251a2a7e2c6ab5f98a150a9604e..1d4b80d10611a2a805073819aaf3b2da9b8353ee 100644 (file)
@@ -76,8 +76,9 @@ struct bufferevent_ops {
         int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
 };
 
-extern const struct bufferevent_ops be_ops_socket;
-extern const struct bufferevent_ops be_ops_filter;
+extern const struct bufferevent_ops bufferevent_ops_socket;
+extern const struct bufferevent_ops bufferevent_ops_filter;
+extern const struct bufferevent_ops bufferevent_ops_pair;
 
 /** Initialize the shared parts of a bufferevent. */
 int bufferevent_init_common(struct bufferevent *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
index ed3e4122406ce2bbe7f50acd08c5e8d50994841d..fcd090be0488cffe549300d848be94c5817bfb54 100644 (file)
@@ -97,7 +97,7 @@ struct bufferevent_filtered {
        void *context;
 };
 
-struct bufferevent_ops bufferevent_ops_filter = {
+const struct bufferevent_ops bufferevent_ops_filter = {
        "filter",
        evutil_offsetof(struct bufferevent_filtered, bev),
        be_filter_enable,
diff --git a/bufferevent_pair.c b/bufferevent_pair.c
new file mode 100644 (file)
index 0000000..5e12ae8
--- /dev/null
@@ -0,0 +1,276 @@
+/*
+ * Copyright (c) 2009 Niels Provos, Nick Mathewson
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <sys/types.h>
+#include <assert.h>
+
+#ifdef HAVE_CONFIG_H
+#include "event-config.h"
+#endif
+
+#include "event2/util.h"
+#include "event2/buffer.h"
+#include "event2/bufferevent.h"
+#include "event2/bufferevent_struct.h"
+#include "event2/event.h"
+#include "defer-internal.h"
+#include "bufferevent-internal.h"
+#include "mm-internal.h"
+
+struct bufferevent_pair {
+       struct bufferevent bev;
+       struct bufferevent_pair *partner;
+       struct deferred_cb deferred_write_cb;
+       struct deferred_cb deferred_read_cb;
+};
+
+
+/* Given a bufferevent that's really a bev part of a bufferevent_pair,
+ * return that bufferevent_filtered. Returns NULL otherwise.*/
+static inline struct bufferevent_pair *
+upcast(struct bufferevent *bev)
+{
+       struct bufferevent_pair *bev_p;
+       if (bev->be_ops != &bufferevent_ops_pair)
+               return NULL;
+       bev_p = (void*)( ((char*)bev) -
+                        evutil_offsetof(struct bufferevent_pair, bev) );
+       assert(bev_p->bev.be_ops == &bufferevent_ops_pair);
+       return bev_p;
+}
+
+#define downcast(bev_pair) (&(bev_pair)->bev)
+
+/* XXX Handle close */
+
+static void be_pair_outbuf_cb(struct evbuffer *,
+    const struct evbuffer_cb_info *, void *);
+
+static void
+run_callback(struct deferred_cb *cb, void *arg)
+{
+       struct bufferevent_pair *bufev = arg;
+       struct bufferevent *bev = downcast(bufev);
+
+       if (cb == &bufev->deferred_read_cb) {
+               if (bev->readcb) {
+                       bev->readcb(bev, bev->cbarg);
+               }
+       } else {
+               if (bev->writecb) {
+                       bev->writecb(bev, bev->cbarg);
+               }
+       }
+}
+
+static struct bufferevent_pair *
+bufferevent_pair_elt_new(struct event_base *base,
+    enum bufferevent_options options)
+{
+       struct bufferevent_pair *bufev;
+       if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
+               return NULL;
+       if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair,
+               options)) {
+               mm_free(bufev);
+               return NULL;
+       }
+       /* XXX set read timeout event */
+       /* XXX set write timeout event */
+       if (!evbuffer_add_cb(bufev->bev.output, be_pair_outbuf_cb, bufev)) {
+               bufferevent_free(downcast(bufev));
+               return NULL;
+       }
+       event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev);
+       event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev);
+
+       return bufev;
+}
+
+int
+bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
+    struct bufferevent *pair[2])
+{
+        struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
+
+       bufev1 = bufferevent_pair_elt_new(base, options);
+       if (!bufev1)
+               return -1;
+       bufev2 = bufferevent_pair_elt_new(base, options);
+       if (!bufev2) {
+               bufferevent_free(downcast(bufev1));
+               return -1;
+       }
+
+       bufev1->partner = bufev2;
+       bufev2->partner = bufev1;
+
+       pair[0] = downcast(bufev1);
+       pair[1] = downcast(bufev2);
+
+       return 0;
+}
+
+static void
+be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
+    int ignore_wm)
+{
+       size_t src_size, dst_size;
+       size_t n;
+
+       if (dst->wm_read.high) {
+               size_t dst_size = evbuffer_get_length(dst->input);
+               if (dst_size < dst->wm_read.high) {
+                       n = dst->wm_read.high - dst_size;
+                       evbuffer_remove_buffer(src->output, dst->input, n);
+               } else {
+                       if (!ignore_wm)
+                               return;
+                       evbuffer_add_buffer(dst->input, src->output);
+               }
+       } else {
+               evbuffer_add_buffer(dst->input, src->output);
+       }
+
+       src_size = evbuffer_get_length(src->output);
+       dst_size = evbuffer_get_length(dst->input);
+
+       if (dst_size >= dst->wm_read.low && dst->readcb) {
+               event_deferred_cb_schedule(dst->ev_base,
+                   &(upcast(dst)->deferred_read_cb));
+       }
+       if (src_size <= src->wm_write.low && src->writecb) {
+               event_deferred_cb_schedule(src->ev_base,
+                   &(upcast(src)->deferred_write_cb));
+       }
+}
+
+static inline int
+be_pair_wants_to_talk(struct bufferevent *src, struct bufferevent *dst)
+{
+       return (src->enabled & EV_WRITE) &&
+           (dst->enabled & EV_READ) && !dst->read_suspended &&
+           evbuffer_get_length(src->output);
+}
+
+static void
+be_pair_outbuf_cb(struct evbuffer *outbuf,
+    const struct evbuffer_cb_info *info, void *arg)
+{
+       struct bufferevent_pair *bev_pair = arg;
+       struct bufferevent_pair *partner = bev_pair->partner;
+
+       if (info->n_added > info->n_deleted && partner) {
+               /* We got more data.  If the other side's reading, then
+                  hand it over. */
+               if (be_pair_wants_to_talk(downcast(bev_pair),
+                       downcast(partner))) {
+                       be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
+               }
+       }
+}
+
+static int
+be_pair_enable(struct bufferevent *bufev, short events)
+{
+       struct bufferevent_pair *bev_p = upcast(bufev);
+       struct bufferevent_pair *partner = bev_p->partner;
+
+       /* We're starting to read! Does the other side have anything to write?*/
+       if ((events & EV_READ) && partner &&
+           be_pair_wants_to_talk(downcast(partner), bufev)) {
+               be_pair_transfer(downcast(partner), bufev, 0);
+       }
+       /* We're starting to write! Does the other side want to read? */
+       if ((events & EV_WRITE) && partner &&
+           be_pair_wants_to_talk(bufev, downcast(partner))) {
+               be_pair_transfer(bufev, downcast(partner), 0);
+       }
+       return 0;
+}
+
+static int
+be_pair_disable(struct bufferevent *bev, short events)
+{
+       return 0;
+}
+
+static void
+be_pair_destruct(struct bufferevent *bev)
+{
+       struct bufferevent_pair *bev_p = upcast(bev);
+
+       if (bev_p->partner) {
+               bev_p->partner->partner = NULL;
+               bev_p->partner = NULL;
+       }
+       event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb);
+       event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb);
+}
+
+static void
+be_pair_adj_timeouts(struct bufferevent *bev)
+{
+       /* TODO: implement. */
+}
+
+static int
+be_pair_flush(struct bufferevent *bev, short iotype,
+    enum bufferevent_flush_mode mode)
+{
+       struct bufferevent_pair *bev_p = upcast(bev);
+       struct bufferevent *partner;
+       if (!bev_p->partner)
+               return -1;
+
+       partner = downcast(bev_p->partner);
+
+       if (mode == BEV_NORMAL)
+               return 0;
+
+       if ((iotype & EV_READ) != 0)
+               be_pair_transfer(partner, bev, 1);
+
+       if ((iotype & EV_WRITE) != 0)
+               be_pair_transfer(bev, partner, 1);
+
+       if (mode == BEV_FINISHED) {
+               if (partner->errorcb)
+                       (*partner->errorcb)(partner,
+                           iotype|EVBUFFER_EOF, partner->cbarg);
+       }
+       return 0;
+}
+
+const struct bufferevent_ops bufferevent_ops_pair = {
+       "pair_elt",
+       evutil_offsetof(struct bufferevent_pair, bev),
+       be_pair_enable,
+       be_pair_disable,
+       be_pair_destruct,
+       be_pair_adj_timeouts,
+       be_pair_flush,
+};
index 391d99b1ef5654fc1ac22ae7ef47e6a85d952e20..561bec79e961a23bcd748ccc21772723fe827713 100644 (file)
@@ -70,7 +70,7 @@ static void be_socket_destruct(struct bufferevent *);
 static void be_socket_adj_timeouts(struct bufferevent *);
 static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
 
-struct bufferevent_ops bufferevent_ops_socket = {
+const struct bufferevent_ops bufferevent_ops_socket = {
        "socket",
        0,
        be_socket_enable,
index 6cb753303e13b06ca795e7fd2c8426e453d4cabe..0da1fb7accfa3dcb39753001ef0041393c6ee302 100644 (file)
@@ -405,6 +405,11 @@ bufferevent_filter_new(struct bufferevent *underlying,
                       void (*free_context)(void *),
                       void *ctx);
 
+/** Allocate a pair of linked bufferevents DOCDOC */
+int
+bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
+    struct bufferevent *pair[2]);
+
 #ifdef __cplusplus
 }
 #endif
index 5578d320bf1ba83ce738a02d349dc846d0f77574..53c29925c949a8a5df1130907581bacd96d77931 100644 (file)
@@ -109,14 +109,23 @@ errorcb(struct bufferevent *bev, short what, void *arg)
 }
 
 static void
-test_bufferevent(void)
+test_bufferevent_impl(int use_pair)
 {
-       struct bufferevent *bev1, *bev2;
+       struct bufferevent *bev1 = NULL, *bev2 = NULL;
        char buffer[8333];
        int i;
 
-       bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
-       bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
+       if (use_pair) {
+               struct bufferevent *pair[2];
+               tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
+               bev1 = pair[0];
+               bev2 = pair[1];
+               bufferevent_setcb(bev1, readcb, writecb, errorcb, NULL);
+               bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
+       } else {
+               bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
+               bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
+       }
 
        bufferevent_disable(bev1, EV_READ);
        bufferevent_enable(bev2, EV_READ);
@@ -133,7 +142,20 @@ test_bufferevent(void)
 
        if (test_ok != 2)
                test_ok = 0;
+end:
+       ;
+}
+
+static void
+test_bufferevent(void)
+{
+       test_bufferevent_impl(0);
+}
 
+static void
+test_bufferevent_pair(void)
+{
+       test_bufferevent_impl(1);
 }
 
 /*
@@ -180,23 +202,30 @@ wm_errorcb(struct bufferevent *bev, short what, void *arg)
 }
 
 static void
-test_bufferevent_watermarks(void)
+test_bufferevent_watermarks_impl(int use_pair)
 {
-       struct bufferevent *bev1, *bev2;
+       struct bufferevent *bev1 = NULL, *bev2 = NULL;
        char buffer[65000];
        int i;
-
-       bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
-       bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
-
+       test_ok = 0;
+
+       if (use_pair) {
+               struct bufferevent *pair[2];
+               tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
+               bev1 = pair[0];
+               bev2 = pair[1];
+               bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL);
+               bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL);
+       } else {
+               bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
+               bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
+       }
        bufferevent_disable(bev1, EV_READ);
        bufferevent_enable(bev2, EV_READ);
 
        for (i = 0; i < sizeof(buffer); i++)
                buffer[i] = (char)i;
 
-       bufferevent_write(bev1, buffer, sizeof(buffer));
-
        /* limit the reading on the receiving bufferevent */
        bufferevent_setwatermark(bev2, EV_READ, 10, 20);
 
@@ -204,6 +233,8 @@ test_bufferevent_watermarks(void)
            100 bytes. */
         bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
 
+       bufferevent_write(bev1, buffer, sizeof(buffer));
+
        event_dispatch();
 
        tt_int_op(test_ok, ==, 2);
@@ -217,6 +248,18 @@ end:
        bufferevent_free(bev2);
 }
 
+static void
+test_bufferevent_watermarks(void)
+{
+       test_bufferevent_watermarks_impl(0);
+}
+
+static void
+test_bufferevent_pair_watermarks(void)
+{
+       test_bufferevent_watermarks_impl(1);
+}
+
 /*
  * Test bufferevent filters
  */
@@ -264,16 +307,23 @@ bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
 
 
 static void
-test_bufferevent_filters(void)
+test_bufferevent_filters_impl(int use_pair)
 {
-       struct bufferevent *bev1, *bev2;
+       struct bufferevent *bev1 = NULL, *bev2 = NULL;
        char buffer[8333];
        int i;
 
         test_ok = 0;
 
-       bev1 = bufferevent_socket_new(NULL, pair[0], 0);
-       bev2 = bufferevent_socket_new(NULL, pair[1], 0);
+       if (use_pair) {
+               struct bufferevent *pair[2];
+               tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
+               bev1 = pair[0];
+               bev2 = pair[1];
+       } else {
+               bev1 = bufferevent_socket_new(NULL, pair[0], 0);
+               bev2 = bufferevent_socket_new(NULL, pair[1], 0);
+       }
 
        for (i = 0; i < sizeof(buffer); i++)
                buffer[i] = i;
@@ -293,18 +343,35 @@ test_bufferevent_filters(void)
 
        event_dispatch();
 
+       if (test_ok != 2)
+               test_ok = 0;
+
+end:
        bufferevent_free(bev1);
        bufferevent_free(bev2);
 
-       if (test_ok != 2)
-               test_ok = 0;
+}
+
+static void
+test_bufferevent_filters(void)
+{
+       test_bufferevent_filters_impl(0);
+}
+
+static void
+test_bufferevent_pair_filters(void)
+{
+       test_bufferevent_filters_impl(1);
 }
 
 struct testcase_t bufferevent_testcases[] = {
 
         LEGACY(bufferevent, TT_ISOLATED),
+        LEGACY(bufferevent_pair, TT_ISOLATED),
         LEGACY(bufferevent_watermarks, TT_ISOLATED),
+        LEGACY(bufferevent_pair_watermarks, TT_ISOLATED),
         LEGACY(bufferevent_filters, TT_ISOLATED),
+        LEGACY(bufferevent_pair_filters, TT_ISOLATED),
 #ifdef _EVENT_HAVE_LIBZ
         LEGACY(bufferevent_zlib, TT_ISOLATED),
 #else