From: Nick Mathewson Date: Fri, 10 Apr 2009 15:01:31 +0000 (+0000) Subject: Add a linked-pair abstraction to bufferevents. X-Git-Tag: release-2.0.1-alpha~37 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=23085c92477035507c499530d4a200bceee5d8a1;p=libevent Add a linked-pair abstraction to bufferevents. The new bufferevent_pair abstraction works like a set of buferevent_sockets connected by a socketpair, except that it doesn't require a socketpair, and therefore doesn't need to get the kernel involved. It's also a good way to make sure that deferred callbacks work. It's a good use case for deferred callbacks: before I implemented them, the recursive relationship between the evbuffer callback and the read callback would make the unit tests overflow the stack. svn:r1152 --- diff --git a/Makefile.am b/Makefile.am index 13d13546..1c7cb87e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -90,6 +90,7 @@ event-config.h: config.h CORE_SRC = event.c buffer.c \ bufferevent.c bufferevent_sock.c bufferevent_filter.c \ + bufferevent_pair.c \ evmap.c log.c evutil.c strlcpy.c $(SYS_SRC) EXTRA_SRC = event_tagging.c http.c evdns.c evrpc.c diff --git a/bufferevent-internal.h b/bufferevent-internal.h index a08dfe86..1d4b80d1 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -76,8 +76,9 @@ struct bufferevent_ops { int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode); }; -extern const struct bufferevent_ops be_ops_socket; -extern const struct bufferevent_ops be_ops_filter; +extern const struct bufferevent_ops bufferevent_ops_socket; +extern const struct bufferevent_ops bufferevent_ops_filter; +extern const struct bufferevent_ops bufferevent_ops_pair; /** Initialize the shared parts of a bufferevent. */ int bufferevent_init_common(struct bufferevent *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options); diff --git a/bufferevent_filter.c b/bufferevent_filter.c index ed3e4122..fcd090be 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -97,7 +97,7 @@ struct bufferevent_filtered { void *context; }; -struct bufferevent_ops bufferevent_ops_filter = { +const struct bufferevent_ops bufferevent_ops_filter = { "filter", evutil_offsetof(struct bufferevent_filtered, bev), be_filter_enable, diff --git a/bufferevent_pair.c b/bufferevent_pair.c new file mode 100644 index 00000000..5e12ae8a --- /dev/null +++ b/bufferevent_pair.c @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2009 Niels Provos, Nick Mathewson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +#ifdef HAVE_CONFIG_H +#include "event-config.h" +#endif + +#include "event2/util.h" +#include "event2/buffer.h" +#include "event2/bufferevent.h" +#include "event2/bufferevent_struct.h" +#include "event2/event.h" +#include "defer-internal.h" +#include "bufferevent-internal.h" +#include "mm-internal.h" + +struct bufferevent_pair { + struct bufferevent bev; + struct bufferevent_pair *partner; + struct deferred_cb deferred_write_cb; + struct deferred_cb deferred_read_cb; +}; + + +/* Given a bufferevent that's really a bev part of a bufferevent_pair, + * return that bufferevent_filtered. Returns NULL otherwise.*/ +static inline struct bufferevent_pair * +upcast(struct bufferevent *bev) +{ + struct bufferevent_pair *bev_p; + if (bev->be_ops != &bufferevent_ops_pair) + return NULL; + bev_p = (void*)( ((char*)bev) - + evutil_offsetof(struct bufferevent_pair, bev) ); + assert(bev_p->bev.be_ops == &bufferevent_ops_pair); + return bev_p; +} + +#define downcast(bev_pair) (&(bev_pair)->bev) + +/* XXX Handle close */ + +static void be_pair_outbuf_cb(struct evbuffer *, + const struct evbuffer_cb_info *, void *); + +static void +run_callback(struct deferred_cb *cb, void *arg) +{ + struct bufferevent_pair *bufev = arg; + struct bufferevent *bev = downcast(bufev); + + if (cb == &bufev->deferred_read_cb) { + if (bev->readcb) { + bev->readcb(bev, bev->cbarg); + } + } else { + if (bev->writecb) { + bev->writecb(bev, bev->cbarg); + } + } +} + +static struct bufferevent_pair * +bufferevent_pair_elt_new(struct event_base *base, + enum bufferevent_options options) +{ + struct bufferevent_pair *bufev; + if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) + return NULL; + if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair, + options)) { + mm_free(bufev); + return NULL; + } + /* XXX set read timeout event */ + /* XXX set write timeout event */ + if (!evbuffer_add_cb(bufev->bev.output, be_pair_outbuf_cb, bufev)) { + bufferevent_free(downcast(bufev)); + return NULL; + } + event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev); + event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev); + + return bufev; +} + +int +bufferevent_pair_new(struct event_base *base, enum bufferevent_options options, + struct bufferevent *pair[2]) +{ + struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; + + bufev1 = bufferevent_pair_elt_new(base, options); + if (!bufev1) + return -1; + bufev2 = bufferevent_pair_elt_new(base, options); + if (!bufev2) { + bufferevent_free(downcast(bufev1)); + return -1; + } + + bufev1->partner = bufev2; + bufev2->partner = bufev1; + + pair[0] = downcast(bufev1); + pair[1] = downcast(bufev2); + + return 0; +} + +static void +be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, + int ignore_wm) +{ + size_t src_size, dst_size; + size_t n; + + if (dst->wm_read.high) { + size_t dst_size = evbuffer_get_length(dst->input); + if (dst_size < dst->wm_read.high) { + n = dst->wm_read.high - dst_size; + evbuffer_remove_buffer(src->output, dst->input, n); + } else { + if (!ignore_wm) + return; + evbuffer_add_buffer(dst->input, src->output); + } + } else { + evbuffer_add_buffer(dst->input, src->output); + } + + src_size = evbuffer_get_length(src->output); + dst_size = evbuffer_get_length(dst->input); + + if (dst_size >= dst->wm_read.low && dst->readcb) { + event_deferred_cb_schedule(dst->ev_base, + &(upcast(dst)->deferred_read_cb)); + } + if (src_size <= src->wm_write.low && src->writecb) { + event_deferred_cb_schedule(src->ev_base, + &(upcast(src)->deferred_write_cb)); + } +} + +static inline int +be_pair_wants_to_talk(struct bufferevent *src, struct bufferevent *dst) +{ + return (src->enabled & EV_WRITE) && + (dst->enabled & EV_READ) && !dst->read_suspended && + evbuffer_get_length(src->output); +} + +static void +be_pair_outbuf_cb(struct evbuffer *outbuf, + const struct evbuffer_cb_info *info, void *arg) +{ + struct bufferevent_pair *bev_pair = arg; + struct bufferevent_pair *partner = bev_pair->partner; + + if (info->n_added > info->n_deleted && partner) { + /* We got more data. If the other side's reading, then + hand it over. */ + if (be_pair_wants_to_talk(downcast(bev_pair), + downcast(partner))) { + be_pair_transfer(downcast(bev_pair), downcast(partner), 0); + } + } +} + +static int +be_pair_enable(struct bufferevent *bufev, short events) +{ + struct bufferevent_pair *bev_p = upcast(bufev); + struct bufferevent_pair *partner = bev_p->partner; + + /* We're starting to read! Does the other side have anything to write?*/ + if ((events & EV_READ) && partner && + be_pair_wants_to_talk(downcast(partner), bufev)) { + be_pair_transfer(downcast(partner), bufev, 0); + } + /* We're starting to write! Does the other side want to read? */ + if ((events & EV_WRITE) && partner && + be_pair_wants_to_talk(bufev, downcast(partner))) { + be_pair_transfer(bufev, downcast(partner), 0); + } + return 0; +} + +static int +be_pair_disable(struct bufferevent *bev, short events) +{ + return 0; +} + +static void +be_pair_destruct(struct bufferevent *bev) +{ + struct bufferevent_pair *bev_p = upcast(bev); + + if (bev_p->partner) { + bev_p->partner->partner = NULL; + bev_p->partner = NULL; + } + event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb); + event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb); +} + +static void +be_pair_adj_timeouts(struct bufferevent *bev) +{ + /* TODO: implement. */ +} + +static int +be_pair_flush(struct bufferevent *bev, short iotype, + enum bufferevent_flush_mode mode) +{ + struct bufferevent_pair *bev_p = upcast(bev); + struct bufferevent *partner; + if (!bev_p->partner) + return -1; + + partner = downcast(bev_p->partner); + + if (mode == BEV_NORMAL) + return 0; + + if ((iotype & EV_READ) != 0) + be_pair_transfer(partner, bev, 1); + + if ((iotype & EV_WRITE) != 0) + be_pair_transfer(bev, partner, 1); + + if (mode == BEV_FINISHED) { + if (partner->errorcb) + (*partner->errorcb)(partner, + iotype|EVBUFFER_EOF, partner->cbarg); + } + return 0; +} + +const struct bufferevent_ops bufferevent_ops_pair = { + "pair_elt", + evutil_offsetof(struct bufferevent_pair, bev), + be_pair_enable, + be_pair_disable, + be_pair_destruct, + be_pair_adj_timeouts, + be_pair_flush, +}; diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 391d99b1..561bec79 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -70,7 +70,7 @@ static void be_socket_destruct(struct bufferevent *); static void be_socket_adj_timeouts(struct bufferevent *); static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode); -struct bufferevent_ops bufferevent_ops_socket = { +const struct bufferevent_ops bufferevent_ops_socket = { "socket", 0, be_socket_enable, diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index 6cb75330..0da1fb7a 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -405,6 +405,11 @@ bufferevent_filter_new(struct bufferevent *underlying, void (*free_context)(void *), void *ctx); +/** Allocate a pair of linked bufferevents DOCDOC */ +int +bufferevent_pair_new(struct event_base *base, enum bufferevent_options options, + struct bufferevent *pair[2]); + #ifdef __cplusplus } #endif diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 5578d320..53c29925 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -109,14 +109,23 @@ errorcb(struct bufferevent *bev, short what, void *arg) } static void -test_bufferevent(void) +test_bufferevent_impl(int use_pair) { - struct bufferevent *bev1, *bev2; + struct bufferevent *bev1 = NULL, *bev2 = NULL; char buffer[8333]; int i; - bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); - bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); + if (use_pair) { + struct bufferevent *pair[2]; + tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); + bev1 = pair[0]; + bev2 = pair[1]; + bufferevent_setcb(bev1, readcb, writecb, errorcb, NULL); + bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL); + } else { + bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL); + bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL); + } bufferevent_disable(bev1, EV_READ); bufferevent_enable(bev2, EV_READ); @@ -133,7 +142,20 @@ test_bufferevent(void) if (test_ok != 2) test_ok = 0; +end: + ; +} + +static void +test_bufferevent(void) +{ + test_bufferevent_impl(0); +} +static void +test_bufferevent_pair(void) +{ + test_bufferevent_impl(1); } /* @@ -180,23 +202,30 @@ wm_errorcb(struct bufferevent *bev, short what, void *arg) } static void -test_bufferevent_watermarks(void) +test_bufferevent_watermarks_impl(int use_pair) { - struct bufferevent *bev1, *bev2; + struct bufferevent *bev1 = NULL, *bev2 = NULL; char buffer[65000]; int i; - - bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL); - bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL); - + test_ok = 0; + + if (use_pair) { + struct bufferevent *pair[2]; + tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); + bev1 = pair[0]; + bev2 = pair[1]; + bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL); + bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL); + } else { + bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL); + bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL); + } bufferevent_disable(bev1, EV_READ); bufferevent_enable(bev2, EV_READ); for (i = 0; i < sizeof(buffer); i++) buffer[i] = (char)i; - bufferevent_write(bev1, buffer, sizeof(buffer)); - /* limit the reading on the receiving bufferevent */ bufferevent_setwatermark(bev2, EV_READ, 10, 20); @@ -204,6 +233,8 @@ test_bufferevent_watermarks(void) 100 bytes. */ bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000); + bufferevent_write(bev1, buffer, sizeof(buffer)); + event_dispatch(); tt_int_op(test_ok, ==, 2); @@ -217,6 +248,18 @@ end: bufferevent_free(bev2); } +static void +test_bufferevent_watermarks(void) +{ + test_bufferevent_watermarks_impl(0); +} + +static void +test_bufferevent_pair_watermarks(void) +{ + test_bufferevent_watermarks_impl(1); +} + /* * Test bufferevent filters */ @@ -264,16 +307,23 @@ bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst, static void -test_bufferevent_filters(void) +test_bufferevent_filters_impl(int use_pair) { - struct bufferevent *bev1, *bev2; + struct bufferevent *bev1 = NULL, *bev2 = NULL; char buffer[8333]; int i; test_ok = 0; - bev1 = bufferevent_socket_new(NULL, pair[0], 0); - bev2 = bufferevent_socket_new(NULL, pair[1], 0); + if (use_pair) { + struct bufferevent *pair[2]; + tt_assert(0 == bufferevent_pair_new(NULL, 0, pair)); + bev1 = pair[0]; + bev2 = pair[1]; + } else { + bev1 = bufferevent_socket_new(NULL, pair[0], 0); + bev2 = bufferevent_socket_new(NULL, pair[1], 0); + } for (i = 0; i < sizeof(buffer); i++) buffer[i] = i; @@ -293,18 +343,35 @@ test_bufferevent_filters(void) event_dispatch(); + if (test_ok != 2) + test_ok = 0; + +end: bufferevent_free(bev1); bufferevent_free(bev2); - if (test_ok != 2) - test_ok = 0; +} + +static void +test_bufferevent_filters(void) +{ + test_bufferevent_filters_impl(0); +} + +static void +test_bufferevent_pair_filters(void) +{ + test_bufferevent_filters_impl(1); } struct testcase_t bufferevent_testcases[] = { LEGACY(bufferevent, TT_ISOLATED), + LEGACY(bufferevent_pair, TT_ISOLATED), LEGACY(bufferevent_watermarks, TT_ISOLATED), + LEGACY(bufferevent_pair_watermarks, TT_ISOLATED), LEGACY(bufferevent_filters, TT_ISOLATED), + LEGACY(bufferevent_pair_filters, TT_ISOLATED), #ifdef _EVENT_HAVE_LIBZ LEGACY(bufferevent_zlib, TT_ISOLATED), #else